001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.rsgroup;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.TreeMap;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.ClusterMetrics;
034import org.apache.hadoop.hbase.HBaseIOException;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.constraint.ConstraintException;
040import org.apache.hadoop.hbase.master.LoadBalancer;
041import org.apache.hadoop.hbase.master.MasterServices;
042import org.apache.hadoop.hbase.master.RegionPlan;
043import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
044import org.apache.hadoop.hbase.net.Address;
045import org.apache.hadoop.util.ReflectionUtils;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
051import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
052import org.apache.hbase.thirdparty.com.google.common.collect.LinkedListMultimap;
053import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
054import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
055import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
056
057/**
058 * GroupBasedLoadBalancer, used when Region Server Grouping is configured (HBase-6721)
059 * It does region balance based on a table's group membership.
060 *
061 * Most assignment methods contain two exclusive code paths: Online - when the group
062 * table is online and Offline - when it is unavailable.
063 *
064 * During Offline, assignments are assigned based on cached information in zookeeper.
065 * If unavailable (ie bootstrap) then regions are assigned randomly.
066 *
067 * Once the GROUP table has been assigned, the balancer switches to Online and will then
068 * start providing appropriate assignments for user tables.
069 *
070 */
071@InterfaceAudience.Private
072public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
073  private static final Logger LOG = LoggerFactory.getLogger(RSGroupBasedLoadBalancer.class);
074
075  private Configuration config;
076  private ClusterMetrics clusterStatus;
077  private MasterServices masterServices;
078  private volatile RSGroupInfoManager rsGroupInfoManager;
079  private LoadBalancer internalBalancer;
080
081  /**
082   * Used by reflection in {@link org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory}.
083   */
084  @InterfaceAudience.Private
085  public RSGroupBasedLoadBalancer() {}
086
087  @Override
088  public Configuration getConf() {
089    return config;
090  }
091
092  @Override
093  public void setConf(Configuration conf) {
094    this.config = conf;
095    if (internalBalancer != null) {
096      internalBalancer.setConf(conf);
097    }
098  }
099
100  @Override
101  public void setClusterMetrics(ClusterMetrics sm) {
102    this.clusterStatus = sm;
103    if (internalBalancer != null) {
104      internalBalancer.setClusterMetrics(sm);
105    }
106  }
107
108  @Override
109  public void setMasterServices(MasterServices masterServices) {
110    this.masterServices = masterServices;
111  }
112
113  @Override
114  public List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName, List<RegionInfo>>
115      clusterState) throws HBaseIOException {
116    return balanceCluster(clusterState);
117  }
118
119  @Override
120  public List<RegionPlan> balanceCluster(Map<ServerName, List<RegionInfo>> clusterState)
121      throws HBaseIOException {
122    if (!isOnline()) {
123      throw new ConstraintException(RSGroupInfoManager.RSGROUP_TABLE_NAME +
124          " is not online, unable to perform balance");
125    }
126
127    Map<ServerName,List<RegionInfo>> correctedState = correctAssignments(clusterState);
128    List<RegionPlan> regionPlans = new ArrayList<>();
129
130    List<RegionInfo> misplacedRegions = correctedState.get(LoadBalancer.BOGUS_SERVER_NAME);
131    for (RegionInfo regionInfo : misplacedRegions) {
132      ServerName serverName = findServerForRegion(clusterState, regionInfo);
133      regionPlans.add(new RegionPlan(regionInfo, serverName, null));
134    }
135    try {
136      // Record which region servers have been processed,so as to skip them after processed
137      HashSet<ServerName> processedServers = new HashSet<>();
138
139      // For each rsgroup
140      for (RSGroupInfo rsgroup : rsGroupInfoManager.listRSGroups()) {
141        Map<ServerName, List<RegionInfo>> groupClusterState = new HashMap<>();
142        Map<TableName, Map<ServerName, List<RegionInfo>>> groupClusterLoad = new HashMap<>();
143        for (ServerName server : clusterState.keySet()) { // for each region server
144          if (!processedServers.contains(server) // server is not processed yet
145              && rsgroup.containsServer(server.getAddress())) { // server belongs to this rsgroup
146            List<RegionInfo> regionsOnServer = correctedState.get(server);
147            groupClusterState.put(server, regionsOnServer);
148
149            processedServers.add(server);
150          }
151        }
152
153        groupClusterLoad.put(HConstants.ENSEMBLE_TABLE_NAME, groupClusterState);
154        this.internalBalancer.setClusterLoad(groupClusterLoad);
155        List<RegionPlan> groupPlans = this.internalBalancer
156            .balanceCluster(groupClusterState);
157        if (groupPlans != null) {
158          regionPlans.addAll(groupPlans);
159        }
160      }
161    } catch (IOException exp) {
162      LOG.warn("Exception while balancing cluster.", exp);
163      regionPlans.clear();
164    }
165    return regionPlans;
166  }
167
168  @Override
169  public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions,
170    List<ServerName> servers) throws HBaseIOException {
171    Map<ServerName, List<RegionInfo>> assignments = Maps.newHashMap();
172    ListMultimap<String, RegionInfo> regionMap = ArrayListMultimap.create();
173    ListMultimap<String, ServerName> serverMap = ArrayListMultimap.create();
174    generateGroupMaps(regions, servers, regionMap, serverMap);
175    for (String groupKey : regionMap.keySet()) {
176      if (regionMap.get(groupKey).size() > 0) {
177        Map<ServerName, List<RegionInfo>> result = this.internalBalancer
178          .roundRobinAssignment(regionMap.get(groupKey), serverMap.get(groupKey));
179        if (result != null) {
180          if (result.containsKey(LoadBalancer.BOGUS_SERVER_NAME) &&
181            assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
182            assignments.get(LoadBalancer.BOGUS_SERVER_NAME)
183              .addAll(result.get(LoadBalancer.BOGUS_SERVER_NAME));
184          } else {
185            assignments.putAll(result);
186          }
187        }
188      }
189    }
190    return assignments;
191  }
192
193  @Override
194  public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions,
195    List<ServerName> servers) throws HBaseIOException {
196    try {
197      Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>();
198      ListMultimap<String, RegionInfo> groupToRegion = ArrayListMultimap.create();
199      for (RegionInfo region : regions.keySet()) {
200        String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
201        if (groupName == null) {
202          LOG.debug("Group not found for table " + region.getTable() + ", using default");
203          groupName = RSGroupInfo.DEFAULT_GROUP;
204        }
205        groupToRegion.put(groupName, region);
206      }
207      for (String key : groupToRegion.keySet()) {
208        Map<RegionInfo, ServerName> currentAssignmentMap = new TreeMap<RegionInfo, ServerName>();
209        List<RegionInfo> regionList = groupToRegion.get(key);
210        RSGroupInfo info = rsGroupInfoManager.getRSGroup(key);
211        List<ServerName> candidateList = filterOfflineServers(info, servers);
212        for (RegionInfo region : regionList) {
213          currentAssignmentMap.put(region, regions.get(region));
214        }
215        if (candidateList.size() > 0) {
216          assignments
217            .putAll(this.internalBalancer.retainAssignment(currentAssignmentMap, candidateList));
218        } else {
219          if (LOG.isDebugEnabled()) {
220            LOG.debug("No available servers to assign regions: {}",
221              RegionInfo.getShortNameToLog(regionList));
222          }
223          assignments.computeIfAbsent(LoadBalancer.BOGUS_SERVER_NAME, s -> new ArrayList<>())
224            .addAll(regionList);
225        }
226      }
227      return assignments;
228    } catch (IOException e) {
229      throw new HBaseIOException("Failed to do online retain assignment", e);
230    }
231  }
232
233  @Override
234  public ServerName randomAssignment(RegionInfo region,
235      List<ServerName> servers) throws HBaseIOException {
236    ListMultimap<String,RegionInfo> regionMap = LinkedListMultimap.create();
237    ListMultimap<String,ServerName> serverMap = LinkedListMultimap.create();
238    generateGroupMaps(Lists.newArrayList(region), servers, regionMap, serverMap);
239    List<ServerName> filteredServers = serverMap.get(regionMap.keySet().iterator().next());
240    return this.internalBalancer.randomAssignment(region, filteredServers);
241  }
242
243  private void generateGroupMaps(List<RegionInfo> regions, List<ServerName> servers,
244    ListMultimap<String, RegionInfo> regionMap, ListMultimap<String, ServerName> serverMap)
245    throws HBaseIOException {
246    try {
247      for (RegionInfo region : regions) {
248        String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
249        if (groupName == null) {
250          LOG.debug("Group not found for table " + region.getTable() + ", using default");
251          groupName = RSGroupInfo.DEFAULT_GROUP;
252        }
253        regionMap.put(groupName, region);
254      }
255      for (String groupKey : regionMap.keySet()) {
256        RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupKey);
257        serverMap.putAll(groupKey, filterOfflineServers(info, servers));
258        if(serverMap.get(groupKey).size() < 1) {
259          serverMap.put(groupKey, LoadBalancer.BOGUS_SERVER_NAME);
260        }
261      }
262    } catch(IOException e) {
263      throw new HBaseIOException("Failed to generate group maps", e);
264    }
265  }
266
267  private List<ServerName> filterOfflineServers(RSGroupInfo RSGroupInfo,
268                                                List<ServerName> onlineServers) {
269    if (RSGroupInfo != null) {
270      return filterServers(RSGroupInfo.getServers(), onlineServers);
271    } else {
272      LOG.warn("RSGroup Information found to be null. Some regions might be unassigned.");
273      return Collections.emptyList();
274    }
275  }
276
277  /**
278   * Filter servers based on the online servers.
279   * <p/>
280   * servers is actually a TreeSet (see {@link org.apache.hadoop.hbase.rsgroup.RSGroupInfo}), having
281   * its contains()'s time complexity as O(logn), which is good enough.
282   * <p/>
283   * TODO: consider using HashSet to pursue O(1) for contains() throughout the calling chain if
284   * needed.
285   * @param servers the servers
286   * @param onlineServers List of servers which are online.
287   * @return the list
288   */
289  private List<ServerName> filterServers(Set<Address> servers, List<ServerName> onlineServers) {
290    ArrayList<ServerName> finalList = new ArrayList<>();
291    for (ServerName onlineServer : onlineServers) {
292      if (servers.contains(onlineServer.getAddress())) {
293        finalList.add(onlineServer);
294      }
295    }
296    return finalList;
297  }
298
299  private ServerName findServerForRegion(
300      Map<ServerName, List<RegionInfo>> existingAssignments, RegionInfo region) {
301    for (Map.Entry<ServerName, List<RegionInfo>> entry : existingAssignments.entrySet()) {
302      if (entry.getValue().contains(region)) {
303        return entry.getKey();
304      }
305    }
306
307    throw new IllegalStateException("Could not find server for region "
308        + region.getShortNameToLog());
309  }
310
311  private Map<ServerName, List<RegionInfo>> correctAssignments(
312      Map<ServerName, List<RegionInfo>> existingAssignments) throws HBaseIOException{
313    Map<ServerName, List<RegionInfo>> correctAssignments = new TreeMap<>();
314    correctAssignments.put(LoadBalancer.BOGUS_SERVER_NAME, new LinkedList<>());
315    for (Map.Entry<ServerName, List<RegionInfo>> assignments : existingAssignments.entrySet()){
316      ServerName sName = assignments.getKey();
317      correctAssignments.put(sName, new LinkedList<>());
318      List<RegionInfo> regions = assignments.getValue();
319      for (RegionInfo region : regions) {
320        RSGroupInfo targetRSGInfo = null;
321        try {
322          String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
323          if (groupName == null) {
324            LOG.debug("Group not found for table " + region.getTable() + ", using default");
325            groupName = RSGroupInfo.DEFAULT_GROUP;
326          }
327          targetRSGInfo = rsGroupInfoManager.getRSGroup(groupName);
328        } catch (IOException exp) {
329          LOG.debug("RSGroup information null for region of table " + region.getTable(),
330              exp);
331        }
332        if ((targetRSGInfo == null) || (!targetRSGInfo.containsServer(sName.getAddress()))) {
333          correctAssignments.get(LoadBalancer.BOGUS_SERVER_NAME).add(region);
334        } else {
335          correctAssignments.get(sName).add(region);
336        }
337      }
338    }
339    return correctAssignments;
340  }
341
342  @Override
343  public void initialize() throws HBaseIOException {
344    try {
345      if (rsGroupInfoManager == null) {
346        List<RSGroupAdminEndpoint> cps =
347          masterServices.getMasterCoprocessorHost().findCoprocessors(RSGroupAdminEndpoint.class);
348        if (cps.size() != 1) {
349          String msg = "Expected one implementation of GroupAdminEndpoint but found " + cps.size();
350          LOG.error(msg);
351          throw new HBaseIOException(msg);
352        }
353        rsGroupInfoManager = cps.get(0).getGroupInfoManager();
354        if(rsGroupInfoManager == null){
355          String msg = "RSGroupInfoManager hasn't been initialized";
356          LOG.error(msg);
357          throw new HBaseIOException(msg);
358        }
359        rsGroupInfoManager.start();
360      }
361    } catch (IOException e) {
362      throw new HBaseIOException("Failed to initialize GroupInfoManagerImpl", e);
363    }
364
365    // Create the balancer
366    Class<? extends LoadBalancer> balancerKlass = config.getClass(HBASE_RSGROUP_LOADBALANCER_CLASS,
367        StochasticLoadBalancer.class, LoadBalancer.class);
368    internalBalancer = ReflectionUtils.newInstance(balancerKlass, config);
369    internalBalancer.setMasterServices(masterServices);
370    if (clusterStatus != null) {
371      internalBalancer.setClusterMetrics(clusterStatus);
372    }
373    internalBalancer.setConf(config);
374    internalBalancer.initialize();
375  }
376
377  public boolean isOnline() {
378    if (this.rsGroupInfoManager == null) {
379      return false;
380    }
381
382    return this.rsGroupInfoManager.isOnline();
383  }
384
385  @Override
386  public void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad) {
387  }
388
389  @Override
390  public void regionOnline(RegionInfo regionInfo, ServerName sn) {
391  }
392
393  @Override
394  public void regionOffline(RegionInfo regionInfo) {
395  }
396
397  @Override
398  public void onConfigurationChange(Configuration conf) {
399    //DO nothing for now
400  }
401
402  @Override
403  public void stop(String why) {
404  }
405
406  @Override
407  public boolean isStopped() {
408    return false;
409  }
410
411  @VisibleForTesting
412  public void setRsGroupInfoManager(RSGroupInfoManager rsGroupInfoManager) {
413    this.rsGroupInfoManager = rsGroupInfoManager;
414  }
415
416  @Override
417  public void postMasterStartupInitialize() {
418    this.internalBalancer.postMasterStartupInitialize();
419  }
420
421  public void updateBalancerStatus(boolean status) {
422    internalBalancer.updateBalancerStatus(status);
423  }
424}