001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.rsgroup;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.Set;
028import java.util.TreeMap;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.ClusterMetrics;
032import org.apache.hadoop.hbase.HBaseIOException;
033import org.apache.hadoop.hbase.ServerName;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.RegionInfo;
036import org.apache.hadoop.hbase.constraint.ConstraintException;
037import org.apache.hadoop.hbase.master.LoadBalancer;
038import org.apache.hadoop.hbase.master.MasterServices;
039import org.apache.hadoop.hbase.master.RegionPlan;
040import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
041import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
042import org.apache.hadoop.hbase.net.Address;
043import org.apache.hadoop.hbase.util.Pair;
044import org.apache.hadoop.util.ReflectionUtils;
045import org.apache.yetus.audience.InterfaceAudience;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
050import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
051import org.apache.hbase.thirdparty.com.google.common.collect.LinkedListMultimap;
052import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
053import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
054import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
055
056/**
057 * GroupBasedLoadBalancer, used when Region Server Grouping is configured (HBase-6721)
058 * It does region balance based on a table's group membership.
059 *
060 * Most assignment methods contain two exclusive code paths: Online - when the group
061 * table is online and Offline - when it is unavailable.
062 *
063 * During Offline, assignments are assigned based on cached information in zookeeper.
064 * If unavailable (ie bootstrap) then regions are assigned randomly.
065 *
066 * Once the GROUP table has been assigned, the balancer switches to Online and will then
067 * start providing appropriate assignments for user tables.
068 *
069 */
070@InterfaceAudience.Private
071public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
072  private static final Logger LOG = LoggerFactory.getLogger(RSGroupBasedLoadBalancer.class);
073
074  private Configuration config;
075  private ClusterMetrics clusterStatus;
076  private MasterServices masterServices;
077  private volatile RSGroupInfoManager rsGroupInfoManager;
078  private LoadBalancer internalBalancer;
079
080  /**
081   * Used by reflection in {@link org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory}.
082   */
083  @InterfaceAudience.Private
084  public RSGroupBasedLoadBalancer() {}
085
086  @Override
087  public Configuration getConf() {
088    return config;
089  }
090
091  @Override
092  public void setConf(Configuration conf) {
093    this.config = conf;
094    if (internalBalancer != null) {
095      internalBalancer.setConf(conf);
096    }
097  }
098
099  @Override
100  public void setClusterMetrics(ClusterMetrics sm) {
101    this.clusterStatus = sm;
102    if (internalBalancer != null) {
103      internalBalancer.setClusterMetrics(sm);
104    }
105  }
106
107  @Override
108  public void setMasterServices(MasterServices masterServices) {
109    this.masterServices = masterServices;
110  }
111
112  /**
113   * Override to balance by RSGroup
114   * not invoke {@link #balanceTable(TableName, Map)}
115   */
116  @Override
117  public List<RegionPlan> balanceCluster(
118      Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) throws IOException {
119    if (!isOnline()) {
120      throw new ConstraintException(RSGroupInfoManager.RSGROUP_TABLE_NAME +
121          " is not online, unable to perform balance");
122    }
123    // Calculate correct assignments and a list of RegionPlan for mis-placed regions
124    Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>
125      correctedStateAndRegionPlans = correctAssignments(loadOfAllTable);
126    Map<TableName, Map<ServerName, List<RegionInfo>>> correctedLoadOfAllTable =
127        correctedStateAndRegionPlans.getFirst();
128    List<RegionPlan> regionPlans = correctedStateAndRegionPlans.getSecond();
129    // Add RegionPlan for the regions which have been placed according to the region server group
130    // assignment into the movement list
131    try {
132      // For each rsgroup
133      for (RSGroupInfo rsgroup : rsGroupInfoManager.listRSGroups()) {
134        Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfTablesInGroup = new HashMap<>();
135        for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> entry : correctedLoadOfAllTable
136            .entrySet()) {
137          TableName tableName = entry.getKey();
138          String targetRSGroupName = rsGroupInfoManager.getRSGroupOfTable(tableName);
139          if (targetRSGroupName == null) {
140            targetRSGroupName = RSGroupInfo.DEFAULT_GROUP;
141          }
142          if (targetRSGroupName.equals(rsgroup.getName())) {
143            loadOfTablesInGroup.put(tableName, entry.getValue());
144          }
145        }
146        List<RegionPlan> groupPlans = null;
147        if (!loadOfTablesInGroup.isEmpty()) {
148          LOG.info("Start Generate Balance plan for group: " + rsgroup.getName());
149          groupPlans = this.internalBalancer.balanceCluster(loadOfTablesInGroup);
150        }
151        if (groupPlans != null) {
152          regionPlans.addAll(groupPlans);
153        }
154      }
155    } catch (IOException exp) {
156      LOG.warn("Exception while balancing cluster.", exp);
157      regionPlans.clear();
158    }
159    return regionPlans;
160  }
161
162  @Override
163  public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions,
164    List<ServerName> servers) throws HBaseIOException {
165    Map<ServerName, List<RegionInfo>> assignments = Maps.newHashMap();
166    ListMultimap<String, RegionInfo> regionMap = ArrayListMultimap.create();
167    ListMultimap<String, ServerName> serverMap = ArrayListMultimap.create();
168    generateGroupMaps(regions, servers, regionMap, serverMap);
169    for (String groupKey : regionMap.keySet()) {
170      if (regionMap.get(groupKey).size() > 0) {
171        Map<ServerName, List<RegionInfo>> result = this.internalBalancer
172          .roundRobinAssignment(regionMap.get(groupKey), serverMap.get(groupKey));
173        if (result != null) {
174          if (result.containsKey(LoadBalancer.BOGUS_SERVER_NAME) &&
175            assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
176            assignments.get(LoadBalancer.BOGUS_SERVER_NAME)
177              .addAll(result.get(LoadBalancer.BOGUS_SERVER_NAME));
178          } else {
179            assignments.putAll(result);
180          }
181        }
182      }
183    }
184    return assignments;
185  }
186
187  @Override
188  public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions,
189    List<ServerName> servers) throws HBaseIOException {
190    try {
191      Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>();
192      ListMultimap<String, RegionInfo> groupToRegion = ArrayListMultimap.create();
193      for (RegionInfo region : regions.keySet()) {
194        String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
195        if (groupName == null) {
196          LOG.debug("Group not found for table " + region.getTable() + ", using default");
197          groupName = RSGroupInfo.DEFAULT_GROUP;
198        }
199        groupToRegion.put(groupName, region);
200      }
201      for (String key : groupToRegion.keySet()) {
202        Map<RegionInfo, ServerName> currentAssignmentMap = new TreeMap<RegionInfo, ServerName>();
203        List<RegionInfo> regionList = groupToRegion.get(key);
204        RSGroupInfo info = rsGroupInfoManager.getRSGroup(key);
205        List<ServerName> candidateList = filterOfflineServers(info, servers);
206        for (RegionInfo region : regionList) {
207          currentAssignmentMap.put(region, regions.get(region));
208        }
209        if (candidateList.size() > 0) {
210          assignments
211            .putAll(this.internalBalancer.retainAssignment(currentAssignmentMap, candidateList));
212        } else {
213          if (LOG.isDebugEnabled()) {
214            LOG.debug("No available servers to assign regions: {}",
215              RegionInfo.getShortNameToLog(regionList));
216          }
217          assignments.computeIfAbsent(LoadBalancer.BOGUS_SERVER_NAME, s -> new ArrayList<>())
218            .addAll(regionList);
219        }
220      }
221      return assignments;
222    } catch (IOException e) {
223      throw new HBaseIOException("Failed to do online retain assignment", e);
224    }
225  }
226
227  @Override
228  public ServerName randomAssignment(RegionInfo region,
229      List<ServerName> servers) throws HBaseIOException {
230    ListMultimap<String,RegionInfo> regionMap = LinkedListMultimap.create();
231    ListMultimap<String,ServerName> serverMap = LinkedListMultimap.create();
232    generateGroupMaps(Lists.newArrayList(region), servers, regionMap, serverMap);
233    List<ServerName> filteredServers = serverMap.get(regionMap.keySet().iterator().next());
234    return this.internalBalancer.randomAssignment(region, filteredServers);
235  }
236
237  private void generateGroupMaps(List<RegionInfo> regions, List<ServerName> servers,
238    ListMultimap<String, RegionInfo> regionMap, ListMultimap<String, ServerName> serverMap)
239    throws HBaseIOException {
240    try {
241      for (RegionInfo region : regions) {
242        String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
243        if (groupName == null) {
244          LOG.debug("Group not found for table " + region.getTable() + ", using default");
245          groupName = RSGroupInfo.DEFAULT_GROUP;
246        }
247        regionMap.put(groupName, region);
248      }
249      for (String groupKey : regionMap.keySet()) {
250        RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupKey);
251        serverMap.putAll(groupKey, filterOfflineServers(info, servers));
252        if(serverMap.get(groupKey).size() < 1) {
253          serverMap.put(groupKey, LoadBalancer.BOGUS_SERVER_NAME);
254        }
255      }
256    } catch(IOException e) {
257      throw new HBaseIOException("Failed to generate group maps", e);
258    }
259  }
260
261  private List<ServerName> filterOfflineServers(RSGroupInfo RSGroupInfo,
262                                                List<ServerName> onlineServers) {
263    if (RSGroupInfo != null) {
264      return filterServers(RSGroupInfo.getServers(), onlineServers);
265    } else {
266      LOG.warn("RSGroup Information found to be null. Some regions might be unassigned.");
267      return Collections.emptyList();
268    }
269  }
270
271  /**
272   * Filter servers based on the online servers.
273   * <p/>
274   * servers is actually a TreeSet (see {@link org.apache.hadoop.hbase.rsgroup.RSGroupInfo}), having
275   * its contains()'s time complexity as O(logn), which is good enough.
276   * <p/>
277   * TODO: consider using HashSet to pursue O(1) for contains() throughout the calling chain if
278   * needed.
279   * @param servers the servers
280   * @param onlineServers List of servers which are online.
281   * @return the list
282   */
283  private List<ServerName> filterServers(Set<Address> servers, List<ServerName> onlineServers) {
284    ArrayList<ServerName> finalList = new ArrayList<>();
285    for (ServerName onlineServer : onlineServers) {
286      if (servers.contains(onlineServer.getAddress())) {
287        finalList.add(onlineServer);
288      }
289    }
290    return finalList;
291  }
292
293  private Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>
294      correctAssignments(Map<TableName, Map<ServerName, List<RegionInfo>>> existingAssignments)
295          throws IOException {
296    // To return
297    Map<TableName, Map<ServerName, List<RegionInfo>>> correctAssignments = new HashMap<>();
298    List<RegionPlan> regionPlansForMisplacedRegions = new ArrayList<>();
299    for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> assignments : existingAssignments
300        .entrySet()) {
301      TableName tableName = assignments.getKey();
302      Map<ServerName, List<RegionInfo>> clusterLoad = assignments.getValue();
303      Map<ServerName, List<RegionInfo>> correctServerRegion = new TreeMap<>();
304      RSGroupInfo targetRSGInfo = null;
305      try {
306        String groupName = rsGroupInfoManager.getRSGroupOfTable(tableName);
307        if (groupName == null) {
308          LOG.debug("Group not found for table " + tableName + ", using default");
309          groupName = RSGroupInfo.DEFAULT_GROUP;
310        }
311        targetRSGInfo = rsGroupInfoManager.getRSGroup(groupName);
312      } catch (IOException exp) {
313        LOG.debug("RSGroup information null for region of table " + tableName, exp);
314      }
315      for (Map.Entry<ServerName, List<RegionInfo>> serverRegionMap : clusterLoad.entrySet()) {
316        ServerName currentHostServer = serverRegionMap.getKey();
317        List<RegionInfo> regionInfoList = serverRegionMap.getValue();
318        if (targetRSGInfo == null
319            || !targetRSGInfo.containsServer(currentHostServer.getAddress())) {
320          regionInfoList.forEach(regionInfo -> {
321            regionPlansForMisplacedRegions.add(new RegionPlan(regionInfo, currentHostServer, null));
322          });
323        } else {
324          correctServerRegion.put(currentHostServer, regionInfoList);
325        }
326      }
327      correctAssignments.put(tableName, correctServerRegion);
328    }
329    // Return correct assignments and region movement plan for mis-placed regions together
330    return new Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>(
331        correctAssignments, regionPlansForMisplacedRegions);
332  }
333
334  @Override
335  public void initialize() throws HBaseIOException {
336    try {
337      if (rsGroupInfoManager == null) {
338        List<RSGroupAdminEndpoint> cps =
339          masterServices.getMasterCoprocessorHost().findCoprocessors(RSGroupAdminEndpoint.class);
340        if (cps.size() != 1) {
341          String msg = "Expected one implementation of GroupAdminEndpoint but found " + cps.size();
342          LOG.error(msg);
343          throw new HBaseIOException(msg);
344        }
345        rsGroupInfoManager = cps.get(0).getGroupInfoManager();
346        if(rsGroupInfoManager == null){
347          String msg = "RSGroupInfoManager hasn't been initialized";
348          LOG.error(msg);
349          throw new HBaseIOException(msg);
350        }
351        rsGroupInfoManager.start();
352      }
353    } catch (IOException e) {
354      throw new HBaseIOException("Failed to initialize GroupInfoManagerImpl", e);
355    }
356
357    // Create the balancer
358    Class<? extends LoadBalancer> balancerClass = config.getClass(HBASE_RSGROUP_LOADBALANCER_CLASS,
359        StochasticLoadBalancer.class, LoadBalancer.class);
360    if (this.getClass().isAssignableFrom(balancerClass)) {
361      LOG.warn("The internal balancer of RSGroupBasedLoadBalancer cannot be itself, " +
362              "falling back to the default LoadBalancer class");
363      balancerClass = LoadBalancerFactory.getDefaultLoadBalancerClass();
364    }
365    internalBalancer = ReflectionUtils.newInstance(balancerClass, config);
366    internalBalancer.setMasterServices(masterServices);
367    if (clusterStatus != null) {
368      internalBalancer.setClusterMetrics(clusterStatus);
369    }
370    internalBalancer.setConf(config);
371    internalBalancer.initialize();
372  }
373
374  public boolean isOnline() {
375    if (this.rsGroupInfoManager == null) {
376      return false;
377    }
378
379    return this.rsGroupInfoManager.isOnline();
380  }
381
382
383  @Override
384  public void regionOnline(RegionInfo regionInfo, ServerName sn) {
385  }
386
387  @Override
388  public void regionOffline(RegionInfo regionInfo) {
389  }
390
391  @Override
392  public void onConfigurationChange(Configuration conf) {
393    //DO nothing for now
394  }
395
396  @Override
397  public void stop(String why) {
398  }
399
400  @Override
401  public boolean isStopped() {
402    return false;
403  }
404
405  @VisibleForTesting
406  public void setRsGroupInfoManager(RSGroupInfoManager rsGroupInfoManager) {
407    this.rsGroupInfoManager = rsGroupInfoManager;
408  }
409
410  @Override
411  public void postMasterStartupInitialize() {
412    this.internalBalancer.postMasterStartupInitialize();
413  }
414
415  public void updateBalancerStatus(boolean status) {
416    internalBalancer.updateBalancerStatus(status);
417  }
418
419  /**
420   * can achieve table balanced rather than overall balanced
421   */
422  @Override
423  public List<RegionPlan> balanceTable(TableName tableName,
424      Map<ServerName, List<RegionInfo>> loadOfOneTable) {
425    if (!isOnline()) {
426      LOG.error(RSGroupInfoManager.class.getSimpleName()
427          + " is not online, unable to perform balanceTable");
428      return null;
429    }
430    Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfThisTable = new HashMap<>();
431    loadOfThisTable.put(tableName, loadOfOneTable);
432    Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>
433      correctedStateAndRegionPlans;
434    // Calculate correct assignments and a list of RegionPlan for mis-placed regions
435    try {
436      correctedStateAndRegionPlans = correctAssignments(loadOfThisTable);
437    } catch (IOException e) {
438      LOG.error("get correct assignments and mis-placed regions error ", e);
439      return null;
440    }
441    Map<TableName, Map<ServerName, List<RegionInfo>>> correctedLoadOfThisTable =
442        correctedStateAndRegionPlans.getFirst();
443    List<RegionPlan> regionPlans = correctedStateAndRegionPlans.getSecond();
444    List<RegionPlan> tablePlans =
445        this.internalBalancer.balanceTable(tableName, correctedLoadOfThisTable.get(tableName));
446
447    if (tablePlans != null) {
448      regionPlans.addAll(tablePlans);
449    }
450    return regionPlans;
451  }
452}