001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.favored;
020
021import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.PRIMARY;
022import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.SECONDARY;
023import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.TERTIARY;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.HBaseIOException;
033import org.apache.hadoop.hbase.HBaseInterfaceAudience;
034import org.apache.hadoop.hbase.ServerMetrics;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.RegionInfo;
038import org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position;
039import org.apache.hadoop.hbase.master.RackManager;
040import org.apache.hadoop.hbase.master.RegionPlan;
041import org.apache.hadoop.hbase.master.ServerManager;
042import org.apache.hadoop.hbase.master.SnapshotOfRegionAssignmentFromMeta;
043import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
044import org.apache.hadoop.hbase.util.Pair;
045import org.apache.yetus.audience.InterfaceAudience;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
050import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
051import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
052
053/**
054 * An implementation of the {@link org.apache.hadoop.hbase.master.LoadBalancer} that
055 * assigns favored nodes for each region. There is a Primary RegionServer that hosts
056 * the region, and then there is Secondary and Tertiary RegionServers. Currently, the
057 * favored nodes information is used in creating HDFS files - the Primary RegionServer
058 * passes the primary, secondary, tertiary node addresses as hints to the
059 * DistributedFileSystem API for creating files on the filesystem. These nodes are
060 * treated as hints by the HDFS to place the blocks of the file. This alleviates the
061 * problem to do with reading from remote nodes (since we can make the Secondary
062 * RegionServer as the new Primary RegionServer) after a region is recovered. This
063 * should help provide consistent read latencies for the regions even when their
064 * primary region servers die.
065 *
066 */
067@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
068public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements FavoredNodesPromoter {
069  private static final Logger LOG = LoggerFactory.getLogger(FavoredNodeLoadBalancer.class);
070
071  private RackManager rackManager;
072  private Configuration conf;
073  private FavoredNodesManager fnm;
074
075  @Override
076  public void setConf(Configuration conf) {
077    this.conf = conf;
078  }
079
080  @Override
081  public synchronized void initialize() throws HBaseIOException {
082    super.initialize();
083    super.setConf(conf);
084    this.fnm = services.getFavoredNodesManager();
085    this.rackManager = new RackManager(conf);
086    super.setConf(conf);
087  }
088
089  @Override
090  public List<RegionPlan> balanceTable(TableName tableName,
091      Map<ServerName, List<RegionInfo>> loadOfOneTable) {
092    // TODO. Look at is whether Stochastic loadbalancer can be integrated with this
093    List<RegionPlan> plans = new ArrayList<>();
094    // perform a scan of the meta to get the latest updates (if any)
095    SnapshotOfRegionAssignmentFromMeta snaphotOfRegionAssignment =
096        new SnapshotOfRegionAssignmentFromMeta(super.services.getConnection());
097    try {
098      snaphotOfRegionAssignment.initialize();
099    } catch (IOException ie) {
100      LOG.warn("Not running balancer since exception was thrown " + ie);
101      return plans;
102    }
103    // This is not used? Findbugs says so: Map<ServerName, ServerName>
104    // serverNameToServerNameWithoutCode = new HashMap<>();
105    Map<ServerName, ServerName> serverNameWithoutCodeToServerName = new HashMap<>();
106    ServerManager serverMgr = super.services.getServerManager();
107    for (ServerName sn : serverMgr.getOnlineServersList()) {
108      ServerName s = ServerName.valueOf(sn.getHostname(), sn.getPort(), ServerName.NON_STARTCODE);
109      // FindBugs complains about useless store! serverNameToServerNameWithoutCode.put(sn, s);
110      serverNameWithoutCodeToServerName.put(s, sn);
111    }
112    for (Map.Entry<ServerName, List<RegionInfo>> entry : loadOfOneTable.entrySet()) {
113      ServerName currentServer = entry.getKey();
114      // get a server without the startcode for the currentServer
115      ServerName currentServerWithoutStartCode = ServerName.valueOf(currentServer.getHostname(),
116        currentServer.getPort(), ServerName.NON_STARTCODE);
117      List<RegionInfo> list = entry.getValue();
118      for (RegionInfo region : list) {
119        if (!FavoredNodesManager.isFavoredNodeApplicable(region)) {
120          continue;
121        }
122        List<ServerName> favoredNodes = fnm.getFavoredNodes(region);
123        if (favoredNodes == null || favoredNodes.get(0).equals(currentServerWithoutStartCode)) {
124          continue; // either favorednodes does not exist or we are already on the primary node
125        }
126        ServerName destination = null;
127        // check whether the primary is available
128        destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(0));
129        if (destination == null) {
130          // check whether the region is on secondary/tertiary
131          if (currentServerWithoutStartCode.equals(favoredNodes.get(1))
132              || currentServerWithoutStartCode.equals(favoredNodes.get(2))) {
133            continue;
134          }
135          // the region is currently on none of the favored nodes
136          // get it on one of them if possible
137          ServerMetrics l1 = super.services.getServerManager()
138              .getLoad(serverNameWithoutCodeToServerName.get(favoredNodes.get(1)));
139          ServerMetrics l2 = super.services.getServerManager()
140              .getLoad(serverNameWithoutCodeToServerName.get(favoredNodes.get(2)));
141          if (l1 != null && l2 != null) {
142            if (l1.getRegionMetrics().size() > l2.getRegionMetrics().size()) {
143              destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(2));
144            } else {
145              destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(1));
146            }
147          } else if (l1 != null) {
148            destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(1));
149          } else if (l2 != null) {
150            destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(2));
151          }
152        }
153
154        if (destination != null) {
155          RegionPlan plan = new RegionPlan(region, currentServer, destination);
156          plans.add(plan);
157        }
158      }
159    }
160    return plans;
161  }
162
163  @Override
164  public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions,
165      List<ServerName> servers) throws HBaseIOException {
166    Map<ServerName, List<RegionInfo>> assignmentMap;
167    try {
168      FavoredNodeAssignmentHelper assignmentHelper =
169          new FavoredNodeAssignmentHelper(servers, rackManager);
170      assignmentHelper.initialize();
171      if (!assignmentHelper.canPlaceFavoredNodes()) {
172        return super.roundRobinAssignment(regions, servers);
173      }
174      // Segregate the regions into two types:
175      // 1. The regions that have favored node assignment, and where at least
176      //    one of the favored node is still alive. In this case, try to adhere
177      //    to the current favored nodes assignment as much as possible - i.e.,
178      //    if the current primary is gone, then make the secondary or tertiary
179      //    as the new host for the region (based on their current load).
180      //    Note that we don't change the favored
181      //    node assignments here (even though one or more favored node is currently
182      //    down). It is up to the balanceCluster to do this hard work. The HDFS
183      //    can handle the fact that some nodes in the favored nodes hint is down
184      //    It'd allocate some other DNs. In combination with stale settings for HDFS,
185      //    we should be just fine.
186      // 2. The regions that currently don't have favored node assignment. We will
187      //    need to come up with favored nodes assignments for them. The corner case
188      //    in (1) above is that all the nodes are unavailable and in that case, we
189      //    will note that this region doesn't have favored nodes.
190      Pair<Map<ServerName,List<RegionInfo>>, List<RegionInfo>> segregatedRegions =
191          segregateRegionsAndAssignRegionsWithFavoredNodes(regions, servers);
192      Map<ServerName,List<RegionInfo>> regionsWithFavoredNodesMap = segregatedRegions.getFirst();
193      List<RegionInfo> regionsWithNoFavoredNodes = segregatedRegions.getSecond();
194      assignmentMap = new HashMap<>();
195      roundRobinAssignmentImpl(assignmentHelper, assignmentMap, regionsWithNoFavoredNodes,
196          servers);
197      // merge the assignment maps
198      assignmentMap.putAll(regionsWithFavoredNodesMap);
199    } catch (Exception ex) {
200      LOG.warn("Encountered exception while doing favored-nodes assignment " + ex +
201          " Falling back to regular assignment");
202      assignmentMap = super.roundRobinAssignment(regions, servers);
203    }
204    return assignmentMap;
205  }
206
207  @Override
208  public ServerName randomAssignment(RegionInfo regionInfo, List<ServerName> servers)
209      throws HBaseIOException {
210    try {
211      FavoredNodeAssignmentHelper assignmentHelper =
212          new FavoredNodeAssignmentHelper(servers, rackManager);
213      assignmentHelper.initialize();
214      ServerName primary = super.randomAssignment(regionInfo, servers);
215      if (!FavoredNodesManager.isFavoredNodeApplicable(regionInfo)
216          || !assignmentHelper.canPlaceFavoredNodes()) {
217        return primary;
218      }
219      List<ServerName> favoredNodes = fnm.getFavoredNodes(regionInfo);
220      // check if we have a favored nodes mapping for this region and if so, return
221      // a server from the favored nodes list if the passed 'servers' contains this
222      // server as well (available servers, that is)
223      if (favoredNodes != null) {
224        for (ServerName s : favoredNodes) {
225          ServerName serverWithLegitStartCode = availableServersContains(servers, s);
226          if (serverWithLegitStartCode != null) {
227            return serverWithLegitStartCode;
228          }
229        }
230      }
231      List<RegionInfo> regions = new ArrayList<>(1);
232      regions.add(regionInfo);
233      Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>(1);
234      primaryRSMap.put(regionInfo, primary);
235      assignSecondaryAndTertiaryNodesForRegion(assignmentHelper, regions, primaryRSMap);
236      return primary;
237    } catch (Exception ex) {
238      LOG.warn("Encountered exception while doing favored-nodes (random)assignment " + ex +
239          " Falling back to regular assignment");
240      return super.randomAssignment(regionInfo, servers);
241    }
242  }
243
244  private Pair<Map<ServerName, List<RegionInfo>>, List<RegionInfo>>
245  segregateRegionsAndAssignRegionsWithFavoredNodes(List<RegionInfo> regions,
246      List<ServerName> availableServers) {
247    Map<ServerName, List<RegionInfo>> assignmentMapForFavoredNodes = new HashMap<>(regions.size() / 2);
248    List<RegionInfo> regionsWithNoFavoredNodes = new ArrayList<>(regions.size()/2);
249    for (RegionInfo region : regions) {
250      List<ServerName> favoredNodes = fnm.getFavoredNodes(region);
251      ServerName primaryHost = null;
252      ServerName secondaryHost = null;
253      ServerName tertiaryHost = null;
254      if (favoredNodes != null) {
255        for (ServerName s : favoredNodes) {
256          ServerName serverWithLegitStartCode = availableServersContains(availableServers, s);
257          if (serverWithLegitStartCode != null) {
258            FavoredNodesPlan.Position position =
259                FavoredNodesPlan.getFavoredServerPosition(favoredNodes, s);
260            if (Position.PRIMARY.equals(position)) {
261              primaryHost = serverWithLegitStartCode;
262            } else if (Position.SECONDARY.equals(position)) {
263              secondaryHost = serverWithLegitStartCode;
264            } else if (Position.TERTIARY.equals(position)) {
265              tertiaryHost = serverWithLegitStartCode;
266            }
267          }
268        }
269        assignRegionToAvailableFavoredNode(assignmentMapForFavoredNodes, region,
270              primaryHost, secondaryHost, tertiaryHost);
271      }
272      if (primaryHost == null && secondaryHost == null && tertiaryHost == null) {
273        //all favored nodes unavailable
274        regionsWithNoFavoredNodes.add(region);
275      }
276    }
277    return new Pair<>(assignmentMapForFavoredNodes, regionsWithNoFavoredNodes);
278  }
279
280  // Do a check of the hostname and port and return the servername from the servers list
281  // that matched (the favoredNode will have a startcode of -1 but we want the real
282  // server with the legit startcode
283  private ServerName availableServersContains(List<ServerName> servers, ServerName favoredNode) {
284    for (ServerName server : servers) {
285      if (ServerName.isSameAddress(favoredNode, server)) {
286        return server;
287      }
288    }
289    return null;
290  }
291
292  private void assignRegionToAvailableFavoredNode(Map<ServerName,
293      List<RegionInfo>> assignmentMapForFavoredNodes, RegionInfo region, ServerName primaryHost,
294      ServerName secondaryHost, ServerName tertiaryHost) {
295    if (primaryHost != null) {
296      addRegionToMap(assignmentMapForFavoredNodes, region, primaryHost);
297    } else if (secondaryHost != null && tertiaryHost != null) {
298      // assign the region to the one with a lower load
299      // (both have the desired hdfs blocks)
300      ServerName s;
301      ServerMetrics tertiaryLoad = super.services.getServerManager().getLoad(tertiaryHost);
302      ServerMetrics secondaryLoad = super.services.getServerManager().getLoad(secondaryHost);
303      if (secondaryLoad.getRegionMetrics().size() < tertiaryLoad.getRegionMetrics().size()) {
304        s = secondaryHost;
305      } else {
306        s = tertiaryHost;
307      }
308      addRegionToMap(assignmentMapForFavoredNodes, region, s);
309    } else if (secondaryHost != null) {
310      addRegionToMap(assignmentMapForFavoredNodes, region, secondaryHost);
311    } else if (tertiaryHost != null) {
312      addRegionToMap(assignmentMapForFavoredNodes, region, tertiaryHost);
313    }
314  }
315
316  private void addRegionToMap(Map<ServerName, List<RegionInfo>> assignmentMapForFavoredNodes,
317      RegionInfo region, ServerName host) {
318    List<RegionInfo> regionsOnServer = null;
319    if ((regionsOnServer = assignmentMapForFavoredNodes.get(host)) == null) {
320      regionsOnServer = new ArrayList<>();
321      assignmentMapForFavoredNodes.put(host, regionsOnServer);
322    }
323    regionsOnServer.add(region);
324  }
325
326  public synchronized List<ServerName> getFavoredNodes(RegionInfo regionInfo) {
327    return this.fnm.getFavoredNodes(regionInfo);
328  }
329
330  private void roundRobinAssignmentImpl(FavoredNodeAssignmentHelper assignmentHelper,
331      Map<ServerName, List<RegionInfo>> assignmentMap,
332      List<RegionInfo> regions, List<ServerName> servers) throws IOException {
333    Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>();
334    // figure the primary RSs
335    assignmentHelper.placePrimaryRSAsRoundRobin(assignmentMap, primaryRSMap, regions);
336    assignSecondaryAndTertiaryNodesForRegion(assignmentHelper, regions, primaryRSMap);
337  }
338
339  private void assignSecondaryAndTertiaryNodesForRegion(
340      FavoredNodeAssignmentHelper assignmentHelper,
341      List<RegionInfo> regions, Map<RegionInfo, ServerName> primaryRSMap) throws IOException {
342    // figure the secondary and tertiary RSs
343    Map<RegionInfo, ServerName[]> secondaryAndTertiaryRSMap =
344        assignmentHelper.placeSecondaryAndTertiaryRS(primaryRSMap);
345
346    Map<RegionInfo, List<ServerName>> regionFNMap = Maps.newHashMap();
347    // now record all the assignments so that we can serve queries later
348    for (RegionInfo region : regions) {
349      // Store the favored nodes without startCode for the ServerName objects
350      // We don't care about the startcode; but only the hostname really
351      List<ServerName> favoredNodesForRegion = new ArrayList<>(3);
352      ServerName sn = primaryRSMap.get(region);
353      favoredNodesForRegion.add(ServerName.valueOf(sn.getHostname(), sn.getPort(),
354          ServerName.NON_STARTCODE));
355      ServerName[] secondaryAndTertiaryNodes = secondaryAndTertiaryRSMap.get(region);
356      if (secondaryAndTertiaryNodes != null) {
357        favoredNodesForRegion.add(ServerName.valueOf(secondaryAndTertiaryNodes[0].getHostname(),
358            secondaryAndTertiaryNodes[0].getPort(), ServerName.NON_STARTCODE));
359        favoredNodesForRegion.add(ServerName.valueOf(secondaryAndTertiaryNodes[1].getHostname(),
360            secondaryAndTertiaryNodes[1].getPort(), ServerName.NON_STARTCODE));
361      }
362      regionFNMap.put(region, favoredNodesForRegion);
363    }
364    fnm.updateFavoredNodes(regionFNMap);
365  }
366
367  /*
368   * Generate Favored Nodes for daughters during region split.
369   *
370   * If the parent does not have FN, regenerates them for the daughters.
371   *
372   * If the parent has FN, inherit two FN from parent for each daughter and generate the remaining.
373   * The primary FN for both the daughters should be the same as parent. Inherit the secondary
374   * FN from the parent but keep it different for each daughter. Choose the remaining FN
375   * randomly. This would give us better distribution over a period of time after enough splits.
376   */
377  @Override
378  public void generateFavoredNodesForDaughter(List<ServerName> servers, RegionInfo parent,
379      RegionInfo regionA, RegionInfo regionB) throws IOException {
380
381    Map<RegionInfo, List<ServerName>> result = new HashMap<>();
382    FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, rackManager);
383    helper.initialize();
384
385    List<ServerName> parentFavoredNodes = getFavoredNodes(parent);
386    if (parentFavoredNodes == null) {
387      LOG.debug("Unable to find favored nodes for parent, " + parent
388          + " generating new favored nodes for daughter");
389      result.put(regionA, helper.generateFavoredNodes(regionA));
390      result.put(regionB, helper.generateFavoredNodes(regionB));
391
392    } else {
393
394      // Lets get the primary and secondary from parent for regionA
395      Set<ServerName> regionAFN =
396          getInheritedFNForDaughter(helper, parentFavoredNodes, PRIMARY, SECONDARY);
397      result.put(regionA, Lists.newArrayList(regionAFN));
398
399      // Lets get the primary and tertiary from parent for regionB
400      Set<ServerName> regionBFN =
401          getInheritedFNForDaughter(helper, parentFavoredNodes, PRIMARY, TERTIARY);
402      result.put(regionB, Lists.newArrayList(regionBFN));
403    }
404
405    fnm.updateFavoredNodes(result);
406  }
407
408  private Set<ServerName> getInheritedFNForDaughter(FavoredNodeAssignmentHelper helper,
409      List<ServerName> parentFavoredNodes, Position primary, Position secondary)
410      throws IOException {
411
412    Set<ServerName> daughterFN = Sets.newLinkedHashSet();
413    if (parentFavoredNodes.size() >= primary.ordinal()) {
414      daughterFN.add(parentFavoredNodes.get(primary.ordinal()));
415    }
416
417    if (parentFavoredNodes.size() >= secondary.ordinal()) {
418      daughterFN.add(parentFavoredNodes.get(secondary.ordinal()));
419    }
420
421    while (daughterFN.size() < FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) {
422      ServerName newNode = helper.generateMissingFavoredNode(Lists.newArrayList(daughterFN));
423      daughterFN.add(newNode);
424    }
425    return daughterFN;
426  }
427
428  /*
429   * Generate favored nodes for a region during merge. Choose the FN from one of the sources to
430   * keep it simple.
431   */
432  @Override
433  public void generateFavoredNodesForMergedRegion(RegionInfo merged, RegionInfo [] mergeParents)
434      throws IOException {
435    Map<RegionInfo, List<ServerName>> regionFNMap = Maps.newHashMap();
436    regionFNMap.put(merged, getFavoredNodes(mergeParents[0]));
437    fnm.updateFavoredNodes(regionFNMap);
438  }
439
440}