001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.rsgroup;
020
021import edu.umd.cs.findbugs.annotations.NonNull;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.Optional;
029import java.util.Set;
030import java.util.TreeMap;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.ClusterMetrics;
033import org.apache.hadoop.hbase.HBaseIOException;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.constraint.ConstraintException;
038import org.apache.hadoop.hbase.master.LoadBalancer;
039import org.apache.hadoop.hbase.master.MasterServices;
040import org.apache.hadoop.hbase.master.RegionPlan;
041import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
042import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
043import org.apache.hadoop.hbase.net.Address;
044import org.apache.hadoop.hbase.util.Pair;
045import org.apache.hadoop.util.ReflectionUtils;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
051import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
052import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
053import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
054
055/**
056 * GroupBasedLoadBalancer, used when Region Server Grouping is configured (HBase-6721)
057 * It does region balance based on a table's group membership.
058 *
059 * Most assignment methods contain two exclusive code paths: Online - when the group
060 * table is online and Offline - when it is unavailable.
061 *
062 * During Offline, assignments are assigned based on cached information in zookeeper.
063 * If unavailable (ie bootstrap) then regions are assigned randomly.
064 *
065 * Once the GROUP table has been assigned, the balancer switches to Online and will then
066 * start providing appropriate assignments for user tables.
067 *
068 */
069@InterfaceAudience.Private
070public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
071  private static final Logger LOG = LoggerFactory.getLogger(RSGroupBasedLoadBalancer.class);
072
073  private Configuration config;
074  private ClusterMetrics clusterStatus;
075  private MasterServices masterServices;
076  private volatile RSGroupInfoManager rsGroupInfoManager;
077  private LoadBalancer internalBalancer;
078
079  /**
080   * Set this key to {@code true} to allow region fallback.
081   * Fallback to the default rsgroup first, then fallback to any group if no online servers in
082   * default rsgroup.
083   * Please keep balancer switch on at the same time, which is relied on to correct misplaced
084   * regions
085   */
086  public static final String FALLBACK_GROUP_ENABLE_KEY = "hbase.rsgroup.fallback.enable";
087
088  private boolean fallbackEnabled = false;
089
090  /**
091   * Used by reflection in {@link org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory}.
092   */
093  @InterfaceAudience.Private
094  public RSGroupBasedLoadBalancer() {}
095
096  @Override
097  public Configuration getConf() {
098    return config;
099  }
100
101  @Override
102  public void setConf(Configuration conf) {
103    this.config = conf;
104    if (internalBalancer != null) {
105      internalBalancer.setConf(conf);
106    }
107  }
108
109  @Override
110  public void setClusterMetrics(ClusterMetrics sm) {
111    this.clusterStatus = sm;
112    if (internalBalancer != null) {
113      internalBalancer.setClusterMetrics(sm);
114    }
115  }
116
117  @Override
118  public void setMasterServices(MasterServices masterServices) {
119    this.masterServices = masterServices;
120  }
121
122  /**
123   * Override to balance by RSGroup
124   * not invoke {@link #balanceTable(TableName, Map)}
125   */
126  @Override
127  public List<RegionPlan> balanceCluster(
128      Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) throws IOException {
129    if (!isOnline()) {
130      throw new ConstraintException(RSGroupInfoManager.RSGROUP_TABLE_NAME +
131          " is not online, unable to perform balance");
132    }
133    // Calculate correct assignments and a list of RegionPlan for mis-placed regions
134    Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>
135      correctedStateAndRegionPlans = correctAssignments(loadOfAllTable);
136    Map<TableName, Map<ServerName, List<RegionInfo>>> correctedLoadOfAllTable =
137        correctedStateAndRegionPlans.getFirst();
138    List<RegionPlan> regionPlans = correctedStateAndRegionPlans.getSecond();
139    // Add RegionPlan for the regions which have been placed according to the region server group
140    // assignment into the movement list
141    try {
142      // For each rsgroup
143      for (RSGroupInfo rsgroup : rsGroupInfoManager.listRSGroups()) {
144        Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfTablesInGroup = new HashMap<>();
145        for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> entry : correctedLoadOfAllTable
146            .entrySet()) {
147          TableName tableName = entry.getKey();
148          String targetRSGroupName = rsGroupInfoManager.getRSGroupOfTable(tableName);
149          if (targetRSGroupName == null) {
150            targetRSGroupName = RSGroupInfo.DEFAULT_GROUP;
151          }
152          if (targetRSGroupName.equals(rsgroup.getName())) {
153            loadOfTablesInGroup.put(tableName, entry.getValue());
154          }
155        }
156        List<RegionPlan> groupPlans = null;
157        if (!loadOfTablesInGroup.isEmpty()) {
158          LOG.info("Start Generate Balance plan for group: " + rsgroup.getName());
159          groupPlans = this.internalBalancer.balanceCluster(loadOfTablesInGroup);
160        }
161        if (groupPlans != null) {
162          regionPlans.addAll(groupPlans);
163        }
164      }
165    } catch (IOException exp) {
166      LOG.warn("Exception while balancing cluster.", exp);
167      regionPlans.clear();
168    }
169    return regionPlans;
170  }
171
172  @Override
173  @NonNull
174  public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions,
175      List<ServerName> servers) throws HBaseIOException {
176    Map<ServerName, List<RegionInfo>> assignments = Maps.newHashMap();
177    List<Pair<List<RegionInfo>, List<ServerName>>> pairs =
178      generateGroupAssignments(regions, servers);
179    for (Pair<List<RegionInfo>, List<ServerName>> pair : pairs) {
180      Map<ServerName, List<RegionInfo>> result =
181          this.internalBalancer.roundRobinAssignment(pair.getFirst(), pair.getSecond());
182      result.forEach((server, regionInfos) -> assignments
183          .computeIfAbsent(server, s -> Lists.newArrayList()).addAll(regionInfos));
184    }
185    return assignments;
186  }
187
188  @Override
189  @NonNull
190  public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions,
191      List<ServerName> servers) throws HBaseIOException {
192    try {
193      Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>();
194      List<Pair<List<RegionInfo>, List<ServerName>>> pairs =
195          generateGroupAssignments(Lists.newArrayList(regions.keySet()), servers);
196      for (Pair<List<RegionInfo>, List<ServerName>> pair : pairs) {
197        List<RegionInfo> regionList = pair.getFirst();
198        Map<RegionInfo, ServerName> currentAssignmentMap = Maps.newTreeMap();
199        regionList.forEach(r -> currentAssignmentMap.put(r, regions.get(r)));
200        Map<ServerName, List<RegionInfo>> pairResult =
201            this.internalBalancer.retainAssignment(currentAssignmentMap, pair.getSecond());
202        pairResult.forEach((server, rs) -> assignments
203            .computeIfAbsent(server, s -> Lists.newArrayList()).addAll(rs));
204      }
205      return assignments;
206    } catch (IOException e) {
207      throw new HBaseIOException("Failed to do online retain assignment", e);
208    }
209  }
210
211  @Override
212  public ServerName randomAssignment(RegionInfo region,
213      List<ServerName> servers) throws HBaseIOException {
214    List<Pair<List<RegionInfo>, List<ServerName>>> pairs =
215      generateGroupAssignments(Lists.newArrayList(region), servers);
216    List<ServerName> filteredServers = pairs.iterator().next().getSecond();
217    return this.internalBalancer.randomAssignment(region, filteredServers);
218  }
219
220  private List<Pair<List<RegionInfo>, List<ServerName>>> generateGroupAssignments(
221    List<RegionInfo> regions, List<ServerName> servers) throws HBaseIOException {
222    try {
223      ListMultimap<String, RegionInfo> regionMap = ArrayListMultimap.create();
224      ListMultimap<String, ServerName> serverMap = ArrayListMultimap.create();
225      RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
226      for (RegionInfo region : regions) {
227        String groupName =
228          Optional.ofNullable(rsGroupInfoManager.getRSGroupOfTable(region.getTable()))
229            .orElse(defaultInfo.getName());
230        regionMap.put(groupName, region);
231      }
232      for (String groupKey : regionMap.keySet()) {
233        RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupKey);
234        serverMap.putAll(groupKey, filterOfflineServers(info, servers));
235      }
236
237      List<Pair<List<RegionInfo>, List<ServerName>>> result = Lists.newArrayList();
238      List<RegionInfo> fallbackRegions = Lists.newArrayList();
239      for (String groupKey : regionMap.keySet()) {
240        if (serverMap.get(groupKey).isEmpty()) {
241          fallbackRegions.addAll(regionMap.get(groupKey));
242        } else {
243          result.add(Pair.newPair(regionMap.get(groupKey), serverMap.get(groupKey)));
244        }
245      }
246      if (!fallbackRegions.isEmpty()) {
247        List<ServerName> candidates = null;
248        if (isFallbackEnabled()) {
249          candidates = getFallBackCandidates(servers);
250        }
251        candidates = (candidates == null || candidates.isEmpty()) ?
252          Lists.newArrayList(BOGUS_SERVER_NAME) : candidates;
253        result.add(Pair.newPair(fallbackRegions, candidates));
254      }
255      return result;
256    } catch(IOException e) {
257      throw new HBaseIOException("Failed to generate group assignments", e);
258    }
259  }
260
261  private List<ServerName> filterOfflineServers(RSGroupInfo RSGroupInfo,
262                                                List<ServerName> onlineServers) {
263    if (RSGroupInfo != null) {
264      return filterServers(RSGroupInfo.getServers(), onlineServers);
265    } else {
266      LOG.warn("RSGroup Information found to be null. Some regions might be unassigned.");
267      return Collections.emptyList();
268    }
269  }
270
271  /**
272   * Filter servers based on the online servers.
273   * <p/>
274   * servers is actually a TreeSet (see {@link org.apache.hadoop.hbase.rsgroup.RSGroupInfo}), having
275   * its contains()'s time complexity as O(logn), which is good enough.
276   * <p/>
277   * TODO: consider using HashSet to pursue O(1) for contains() throughout the calling chain if
278   * needed.
279   * @param servers the servers
280   * @param onlineServers List of servers which are online.
281   * @return the list
282   */
283  private List<ServerName> filterServers(Set<Address> servers, List<ServerName> onlineServers) {
284    ArrayList<ServerName> finalList = new ArrayList<>();
285    for (ServerName onlineServer : onlineServers) {
286      if (servers.contains(onlineServer.getAddress())) {
287        finalList.add(onlineServer);
288      }
289    }
290    return finalList;
291  }
292
293  private Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>
294      correctAssignments(Map<TableName, Map<ServerName, List<RegionInfo>>> existingAssignments)
295          throws IOException {
296    // To return
297    Map<TableName, Map<ServerName, List<RegionInfo>>> correctAssignments = new HashMap<>();
298    List<RegionPlan> regionPlansForMisplacedRegions = new ArrayList<>();
299    for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> assignments : existingAssignments
300        .entrySet()) {
301      TableName tableName = assignments.getKey();
302      Map<ServerName, List<RegionInfo>> clusterLoad = assignments.getValue();
303      Map<ServerName, List<RegionInfo>> correctServerRegion = new TreeMap<>();
304      RSGroupInfo targetRSGInfo = null;
305      try {
306        String groupName = rsGroupInfoManager.getRSGroupOfTable(tableName);
307        if (groupName == null) {
308          LOG.debug("Group not found for table " + tableName + ", using default");
309          groupName = RSGroupInfo.DEFAULT_GROUP;
310        }
311        targetRSGInfo = rsGroupInfoManager.getRSGroup(groupName);
312      } catch (IOException exp) {
313        LOG.debug("RSGroup information null for region of table " + tableName, exp);
314      }
315      for (Map.Entry<ServerName, List<RegionInfo>> serverRegionMap : clusterLoad.entrySet()) {
316        ServerName currentHostServer = serverRegionMap.getKey();
317        List<RegionInfo> regionInfoList = serverRegionMap.getValue();
318        if (targetRSGInfo == null
319            || !targetRSGInfo.containsServer(currentHostServer.getAddress())) {
320          regionInfoList.forEach(regionInfo -> {
321            regionPlansForMisplacedRegions.add(new RegionPlan(regionInfo, currentHostServer, null));
322          });
323        } else {
324          correctServerRegion.put(currentHostServer, regionInfoList);
325        }
326      }
327      correctAssignments.put(tableName, correctServerRegion);
328    }
329    // Return correct assignments and region movement plan for mis-placed regions together
330    return new Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>(
331        correctAssignments, regionPlansForMisplacedRegions);
332  }
333
334  @Override
335  public void initialize() throws HBaseIOException {
336    try {
337      if (rsGroupInfoManager == null) {
338        List<RSGroupAdminEndpoint> cps =
339          masterServices.getMasterCoprocessorHost().findCoprocessors(RSGroupAdminEndpoint.class);
340        if (cps.size() != 1) {
341          String msg = "Expected one implementation of GroupAdminEndpoint but found " + cps.size();
342          LOG.error(msg);
343          throw new HBaseIOException(msg);
344        }
345        rsGroupInfoManager = cps.get(0).getGroupInfoManager();
346        if(rsGroupInfoManager == null){
347          String msg = "RSGroupInfoManager hasn't been initialized";
348          LOG.error(msg);
349          throw new HBaseIOException(msg);
350        }
351        rsGroupInfoManager.start();
352      }
353    } catch (IOException e) {
354      throw new HBaseIOException("Failed to initialize GroupInfoManagerImpl", e);
355    }
356
357    // Create the balancer
358    Class<? extends LoadBalancer> balancerClass = config.getClass(HBASE_RSGROUP_LOADBALANCER_CLASS,
359        StochasticLoadBalancer.class, LoadBalancer.class);
360    if (this.getClass().isAssignableFrom(balancerClass)) {
361      LOG.warn("The internal balancer of RSGroupBasedLoadBalancer cannot be itself, " +
362              "falling back to the default LoadBalancer class");
363      balancerClass = LoadBalancerFactory.getDefaultLoadBalancerClass();
364    }
365    internalBalancer = ReflectionUtils.newInstance(balancerClass, config);
366    internalBalancer.setMasterServices(masterServices);
367    if (clusterStatus != null) {
368      internalBalancer.setClusterMetrics(clusterStatus);
369    }
370    internalBalancer.setConf(config);
371    internalBalancer.initialize();
372    // init fallback groups
373    this.fallbackEnabled = config.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false);
374  }
375
376  public boolean isOnline() {
377    if (this.rsGroupInfoManager == null) {
378      return false;
379    }
380
381    return this.rsGroupInfoManager.isOnline();
382  }
383
384  public boolean isFallbackEnabled() {
385    return fallbackEnabled;
386  }
387
388  @Override
389  public void regionOnline(RegionInfo regionInfo, ServerName sn) {
390  }
391
392  @Override
393  public void regionOffline(RegionInfo regionInfo) {
394  }
395
396  @Override
397  public void onConfigurationChange(Configuration conf) {
398    boolean newFallbackEnabled = conf.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false);
399    if (fallbackEnabled != newFallbackEnabled) {
400      LOG.info("Changing the value of {} from {} to {}", FALLBACK_GROUP_ENABLE_KEY,
401        fallbackEnabled, newFallbackEnabled);
402      fallbackEnabled = newFallbackEnabled;
403    }
404  }
405
406  @Override
407  public void stop(String why) {
408  }
409
410  @Override
411  public boolean isStopped() {
412    return false;
413  }
414
415  public void setRsGroupInfoManager(RSGroupInfoManager rsGroupInfoManager) {
416    this.rsGroupInfoManager = rsGroupInfoManager;
417  }
418
419  @Override
420  public void postMasterStartupInitialize() {
421    this.internalBalancer.postMasterStartupInitialize();
422  }
423
424  public void updateBalancerStatus(boolean status) {
425    internalBalancer.updateBalancerStatus(status);
426  }
427
428  /**
429   * can achieve table balanced rather than overall balanced
430   */
431  @Override
432  public List<RegionPlan> balanceTable(TableName tableName,
433      Map<ServerName, List<RegionInfo>> loadOfOneTable) {
434    if (!isOnline()) {
435      LOG.error(RSGroupInfoManager.class.getSimpleName()
436          + " is not online, unable to perform balanceTable");
437      return null;
438    }
439    Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfThisTable = new HashMap<>();
440    loadOfThisTable.put(tableName, loadOfOneTable);
441    Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>
442      correctedStateAndRegionPlans;
443    // Calculate correct assignments and a list of RegionPlan for mis-placed regions
444    try {
445      correctedStateAndRegionPlans = correctAssignments(loadOfThisTable);
446    } catch (IOException e) {
447      LOG.error("get correct assignments and mis-placed regions error ", e);
448      return null;
449    }
450    Map<TableName, Map<ServerName, List<RegionInfo>>> correctedLoadOfThisTable =
451        correctedStateAndRegionPlans.getFirst();
452    List<RegionPlan> regionPlans = correctedStateAndRegionPlans.getSecond();
453    List<RegionPlan> tablePlans =
454        this.internalBalancer.balanceTable(tableName, correctedLoadOfThisTable.get(tableName));
455
456    if (tablePlans != null) {
457      regionPlans.addAll(tablePlans);
458    }
459    return regionPlans;
460  }
461
462  private List<ServerName> getFallBackCandidates(List<ServerName> servers) {
463    List<ServerName> serverNames = null;
464    try {
465      RSGroupInfo info = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
466      serverNames = filterOfflineServers(info, servers);
467    } catch (IOException e) {
468      LOG.error("Failed to get default rsgroup info to fallback", e);
469    }
470    return serverNames == null || serverNames.isEmpty() ? servers : serverNames;
471  }
472}