001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.favored;
020
021import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.PRIMARY;
022import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.SECONDARY;
023import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.TERTIARY;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.HBaseIOException;
033import org.apache.hadoop.hbase.HBaseInterfaceAudience;
034import org.apache.hadoop.hbase.ServerMetrics;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.RegionInfo;
038import org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position;
039import org.apache.hadoop.hbase.master.RackManager;
040import org.apache.hadoop.hbase.master.RegionPlan;
041import org.apache.hadoop.hbase.master.ServerManager;
042import org.apache.hadoop.hbase.master.SnapshotOfRegionAssignmentFromMeta;
043import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
044import org.apache.hadoop.hbase.util.Pair;
045import org.apache.yetus.audience.InterfaceAudience;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
050import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
051import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
052
053/**
054 * An implementation of the {@link org.apache.hadoop.hbase.master.LoadBalancer} that
055 * assigns favored nodes for each region. There is a Primary RegionServer that hosts
056 * the region, and then there is Secondary and Tertiary RegionServers. Currently, the
057 * favored nodes information is used in creating HDFS files - the Primary RegionServer
058 * passes the primary, secondary, tertiary node addresses as hints to the
059 * DistributedFileSystem API for creating files on the filesystem. These nodes are
060 * treated as hints by the HDFS to place the blocks of the file. This alleviates the
061 * problem to do with reading from remote nodes (since we can make the Secondary
062 * RegionServer as the new Primary RegionServer) after a region is recovered. This
063 * should help provide consistent read latencies for the regions even when their
064 * primary region servers die.
065 *
066 */
067@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
068public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements FavoredNodesPromoter {
069  private static final Logger LOG = LoggerFactory.getLogger(FavoredNodeLoadBalancer.class);
070
071  private RackManager rackManager;
072  private Configuration conf;
073  private FavoredNodesManager fnm;
074
075  @Override
076  public void setConf(Configuration conf) {
077    this.conf = conf;
078  }
079
080  @Override
081  public synchronized void initialize() throws HBaseIOException {
082    super.initialize();
083    super.setConf(conf);
084    this.fnm = services.getFavoredNodesManager();
085    this.rackManager = new RackManager(conf);
086    super.setConf(conf);
087  }
088
089  @Override
090  public List<RegionPlan> balanceCluster(Map<ServerName, List<RegionInfo>> clusterState)  {
091    //TODO. Look at is whether Stochastic loadbalancer can be integrated with this
092    List<RegionPlan> plans = new ArrayList<>();
093    //perform a scan of the meta to get the latest updates (if any)
094    SnapshotOfRegionAssignmentFromMeta snaphotOfRegionAssignment =
095        new SnapshotOfRegionAssignmentFromMeta(super.services.getConnection());
096    try {
097      snaphotOfRegionAssignment.initialize();
098    } catch (IOException ie) {
099      LOG.warn("Not running balancer since exception was thrown " + ie);
100      return plans;
101    }
102    // This is not used? Findbugs says so: Map<ServerName, ServerName> serverNameToServerNameWithoutCode = new HashMap<>();
103    Map<ServerName, ServerName> serverNameWithoutCodeToServerName = new HashMap<>();
104    ServerManager serverMgr = super.services.getServerManager();
105    for (ServerName sn: serverMgr.getOnlineServersList()) {
106      ServerName s = ServerName.valueOf(sn.getHostname(), sn.getPort(), ServerName.NON_STARTCODE);
107      // FindBugs complains about useless store! serverNameToServerNameWithoutCode.put(sn, s);
108      serverNameWithoutCodeToServerName.put(s, sn);
109    }
110    for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
111      ServerName currentServer = entry.getKey();
112      //get a server without the startcode for the currentServer
113      ServerName currentServerWithoutStartCode = ServerName.valueOf(currentServer.getHostname(),
114          currentServer.getPort(), ServerName.NON_STARTCODE);
115      List<RegionInfo> list = entry.getValue();
116      for (RegionInfo region : list) {
117        if(!FavoredNodesManager.isFavoredNodeApplicable(region)) {
118          continue;
119        }
120        List<ServerName> favoredNodes = fnm.getFavoredNodes(region);
121        if (favoredNodes == null || favoredNodes.get(0).equals(currentServerWithoutStartCode)) {
122          continue; //either favorednodes does not exist or we are already on the primary node
123        }
124        ServerName destination = null;
125        //check whether the primary is available
126        destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(0));
127        if (destination == null) {
128          //check whether the region is on secondary/tertiary
129          if (currentServerWithoutStartCode.equals(favoredNodes.get(1)) ||
130              currentServerWithoutStartCode.equals(favoredNodes.get(2))) {
131            continue;
132          }
133          //the region is currently on none of the favored nodes
134          //get it on one of them if possible
135          ServerMetrics l1 = super.services.getServerManager().getLoad(
136              serverNameWithoutCodeToServerName.get(favoredNodes.get(1)));
137          ServerMetrics l2 = super.services.getServerManager().getLoad(
138              serverNameWithoutCodeToServerName.get(favoredNodes.get(2)));
139          if (l1 != null && l2 != null) {
140            if (l1.getRegionMetrics().size() > l2.getRegionMetrics().size()) {
141              destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(2));
142            } else {
143              destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(1));
144            }
145          } else if (l1 != null) {
146            destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(1));
147          } else if (l2 != null) {
148            destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(2));
149          }
150        }
151
152        if (destination != null) {
153          RegionPlan plan = new RegionPlan(region, currentServer, destination);
154          plans.add(plan);
155        }
156      }
157    }
158    return plans;
159  }
160
161  @Override
162  public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions,
163      List<ServerName> servers) throws HBaseIOException {
164    Map<ServerName, List<RegionInfo>> assignmentMap;
165    try {
166      FavoredNodeAssignmentHelper assignmentHelper =
167          new FavoredNodeAssignmentHelper(servers, rackManager);
168      assignmentHelper.initialize();
169      if (!assignmentHelper.canPlaceFavoredNodes()) {
170        return super.roundRobinAssignment(regions, servers);
171      }
172      // Segregate the regions into two types:
173      // 1. The regions that have favored node assignment, and where at least
174      //    one of the favored node is still alive. In this case, try to adhere
175      //    to the current favored nodes assignment as much as possible - i.e.,
176      //    if the current primary is gone, then make the secondary or tertiary
177      //    as the new host for the region (based on their current load).
178      //    Note that we don't change the favored
179      //    node assignments here (even though one or more favored node is currently
180      //    down). It is up to the balanceCluster to do this hard work. The HDFS
181      //    can handle the fact that some nodes in the favored nodes hint is down
182      //    It'd allocate some other DNs. In combination with stale settings for HDFS,
183      //    we should be just fine.
184      // 2. The regions that currently don't have favored node assignment. We will
185      //    need to come up with favored nodes assignments for them. The corner case
186      //    in (1) above is that all the nodes are unavailable and in that case, we
187      //    will note that this region doesn't have favored nodes.
188      Pair<Map<ServerName,List<RegionInfo>>, List<RegionInfo>> segregatedRegions =
189          segregateRegionsAndAssignRegionsWithFavoredNodes(regions, servers);
190      Map<ServerName,List<RegionInfo>> regionsWithFavoredNodesMap = segregatedRegions.getFirst();
191      List<RegionInfo> regionsWithNoFavoredNodes = segregatedRegions.getSecond();
192      assignmentMap = new HashMap<>();
193      roundRobinAssignmentImpl(assignmentHelper, assignmentMap, regionsWithNoFavoredNodes,
194          servers);
195      // merge the assignment maps
196      assignmentMap.putAll(regionsWithFavoredNodesMap);
197    } catch (Exception ex) {
198      LOG.warn("Encountered exception while doing favored-nodes assignment " + ex +
199          " Falling back to regular assignment");
200      assignmentMap = super.roundRobinAssignment(regions, servers);
201    }
202    return assignmentMap;
203  }
204
205  @Override
206  public ServerName randomAssignment(RegionInfo regionInfo, List<ServerName> servers)
207      throws HBaseIOException {
208    try {
209      FavoredNodeAssignmentHelper assignmentHelper =
210          new FavoredNodeAssignmentHelper(servers, rackManager);
211      assignmentHelper.initialize();
212      ServerName primary = super.randomAssignment(regionInfo, servers);
213      if (!FavoredNodesManager.isFavoredNodeApplicable(regionInfo)
214          || !assignmentHelper.canPlaceFavoredNodes()) {
215        return primary;
216      }
217      List<ServerName> favoredNodes = fnm.getFavoredNodes(regionInfo);
218      // check if we have a favored nodes mapping for this region and if so, return
219      // a server from the favored nodes list if the passed 'servers' contains this
220      // server as well (available servers, that is)
221      if (favoredNodes != null) {
222        for (ServerName s : favoredNodes) {
223          ServerName serverWithLegitStartCode = availableServersContains(servers, s);
224          if (serverWithLegitStartCode != null) {
225            return serverWithLegitStartCode;
226          }
227        }
228      }
229      List<RegionInfo> regions = new ArrayList<>(1);
230      regions.add(regionInfo);
231      Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>(1);
232      primaryRSMap.put(regionInfo, primary);
233      assignSecondaryAndTertiaryNodesForRegion(assignmentHelper, regions, primaryRSMap);
234      return primary;
235    } catch (Exception ex) {
236      LOG.warn("Encountered exception while doing favored-nodes (random)assignment " + ex +
237          " Falling back to regular assignment");
238      return super.randomAssignment(regionInfo, servers);
239    }
240  }
241
242  private Pair<Map<ServerName, List<RegionInfo>>, List<RegionInfo>>
243  segregateRegionsAndAssignRegionsWithFavoredNodes(List<RegionInfo> regions,
244      List<ServerName> availableServers) {
245    Map<ServerName, List<RegionInfo>> assignmentMapForFavoredNodes = new HashMap<>(regions.size() / 2);
246    List<RegionInfo> regionsWithNoFavoredNodes = new ArrayList<>(regions.size()/2);
247    for (RegionInfo region : regions) {
248      List<ServerName> favoredNodes = fnm.getFavoredNodes(region);
249      ServerName primaryHost = null;
250      ServerName secondaryHost = null;
251      ServerName tertiaryHost = null;
252      if (favoredNodes != null) {
253        for (ServerName s : favoredNodes) {
254          ServerName serverWithLegitStartCode = availableServersContains(availableServers, s);
255          if (serverWithLegitStartCode != null) {
256            FavoredNodesPlan.Position position =
257                FavoredNodesPlan.getFavoredServerPosition(favoredNodes, s);
258            if (Position.PRIMARY.equals(position)) {
259              primaryHost = serverWithLegitStartCode;
260            } else if (Position.SECONDARY.equals(position)) {
261              secondaryHost = serverWithLegitStartCode;
262            } else if (Position.TERTIARY.equals(position)) {
263              tertiaryHost = serverWithLegitStartCode;
264            }
265          }
266        }
267        assignRegionToAvailableFavoredNode(assignmentMapForFavoredNodes, region,
268              primaryHost, secondaryHost, tertiaryHost);
269      }
270      if (primaryHost == null && secondaryHost == null && tertiaryHost == null) {
271        //all favored nodes unavailable
272        regionsWithNoFavoredNodes.add(region);
273      }
274    }
275    return new Pair<>(assignmentMapForFavoredNodes, regionsWithNoFavoredNodes);
276  }
277
278  // Do a check of the hostname and port and return the servername from the servers list
279  // that matched (the favoredNode will have a startcode of -1 but we want the real
280  // server with the legit startcode
281  private ServerName availableServersContains(List<ServerName> servers, ServerName favoredNode) {
282    for (ServerName server : servers) {
283      if (ServerName.isSameAddress(favoredNode, server)) {
284        return server;
285      }
286    }
287    return null;
288  }
289
290  private void assignRegionToAvailableFavoredNode(Map<ServerName,
291      List<RegionInfo>> assignmentMapForFavoredNodes, RegionInfo region, ServerName primaryHost,
292      ServerName secondaryHost, ServerName tertiaryHost) {
293    if (primaryHost != null) {
294      addRegionToMap(assignmentMapForFavoredNodes, region, primaryHost);
295    } else if (secondaryHost != null && tertiaryHost != null) {
296      // assign the region to the one with a lower load
297      // (both have the desired hdfs blocks)
298      ServerName s;
299      ServerMetrics tertiaryLoad = super.services.getServerManager().getLoad(tertiaryHost);
300      ServerMetrics secondaryLoad = super.services.getServerManager().getLoad(secondaryHost);
301      if (secondaryLoad.getRegionMetrics().size() < tertiaryLoad.getRegionMetrics().size()) {
302        s = secondaryHost;
303      } else {
304        s = tertiaryHost;
305      }
306      addRegionToMap(assignmentMapForFavoredNodes, region, s);
307    } else if (secondaryHost != null) {
308      addRegionToMap(assignmentMapForFavoredNodes, region, secondaryHost);
309    } else if (tertiaryHost != null) {
310      addRegionToMap(assignmentMapForFavoredNodes, region, tertiaryHost);
311    }
312  }
313
314  private void addRegionToMap(Map<ServerName, List<RegionInfo>> assignmentMapForFavoredNodes,
315      RegionInfo region, ServerName host) {
316    List<RegionInfo> regionsOnServer = null;
317    if ((regionsOnServer = assignmentMapForFavoredNodes.get(host)) == null) {
318      regionsOnServer = new ArrayList<>();
319      assignmentMapForFavoredNodes.put(host, regionsOnServer);
320    }
321    regionsOnServer.add(region);
322  }
323
324  public synchronized List<ServerName> getFavoredNodes(RegionInfo regionInfo) {
325    return this.fnm.getFavoredNodes(regionInfo);
326  }
327
328  private void roundRobinAssignmentImpl(FavoredNodeAssignmentHelper assignmentHelper,
329      Map<ServerName, List<RegionInfo>> assignmentMap,
330      List<RegionInfo> regions, List<ServerName> servers) throws IOException {
331    Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>();
332    // figure the primary RSs
333    assignmentHelper.placePrimaryRSAsRoundRobin(assignmentMap, primaryRSMap, regions);
334    assignSecondaryAndTertiaryNodesForRegion(assignmentHelper, regions, primaryRSMap);
335  }
336
337  private void assignSecondaryAndTertiaryNodesForRegion(
338      FavoredNodeAssignmentHelper assignmentHelper,
339      List<RegionInfo> regions, Map<RegionInfo, ServerName> primaryRSMap) throws IOException {
340    // figure the secondary and tertiary RSs
341    Map<RegionInfo, ServerName[]> secondaryAndTertiaryRSMap =
342        assignmentHelper.placeSecondaryAndTertiaryRS(primaryRSMap);
343
344    Map<RegionInfo, List<ServerName>> regionFNMap = Maps.newHashMap();
345    // now record all the assignments so that we can serve queries later
346    for (RegionInfo region : regions) {
347      // Store the favored nodes without startCode for the ServerName objects
348      // We don't care about the startcode; but only the hostname really
349      List<ServerName> favoredNodesForRegion = new ArrayList<>(3);
350      ServerName sn = primaryRSMap.get(region);
351      favoredNodesForRegion.add(ServerName.valueOf(sn.getHostname(), sn.getPort(),
352          ServerName.NON_STARTCODE));
353      ServerName[] secondaryAndTertiaryNodes = secondaryAndTertiaryRSMap.get(region);
354      if (secondaryAndTertiaryNodes != null) {
355        favoredNodesForRegion.add(ServerName.valueOf(secondaryAndTertiaryNodes[0].getHostname(),
356            secondaryAndTertiaryNodes[0].getPort(), ServerName.NON_STARTCODE));
357        favoredNodesForRegion.add(ServerName.valueOf(secondaryAndTertiaryNodes[1].getHostname(),
358            secondaryAndTertiaryNodes[1].getPort(), ServerName.NON_STARTCODE));
359      }
360      regionFNMap.put(region, favoredNodesForRegion);
361    }
362    fnm.updateFavoredNodes(regionFNMap);
363  }
364
365  /*
366   * Generate Favored Nodes for daughters during region split.
367   *
368   * If the parent does not have FN, regenerates them for the daughters.
369   *
370   * If the parent has FN, inherit two FN from parent for each daughter and generate the remaining.
371   * The primary FN for both the daughters should be the same as parent. Inherit the secondary
372   * FN from the parent but keep it different for each daughter. Choose the remaining FN
373   * randomly. This would give us better distribution over a period of time after enough splits.
374   */
375  @Override
376  public void generateFavoredNodesForDaughter(List<ServerName> servers, RegionInfo parent,
377      RegionInfo regionA, RegionInfo regionB) throws IOException {
378
379    Map<RegionInfo, List<ServerName>> result = new HashMap<>();
380    FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, rackManager);
381    helper.initialize();
382
383    List<ServerName> parentFavoredNodes = getFavoredNodes(parent);
384    if (parentFavoredNodes == null) {
385      LOG.debug("Unable to find favored nodes for parent, " + parent
386          + " generating new favored nodes for daughter");
387      result.put(regionA, helper.generateFavoredNodes(regionA));
388      result.put(regionB, helper.generateFavoredNodes(regionB));
389
390    } else {
391
392      // Lets get the primary and secondary from parent for regionA
393      Set<ServerName> regionAFN =
394          getInheritedFNForDaughter(helper, parentFavoredNodes, PRIMARY, SECONDARY);
395      result.put(regionA, Lists.newArrayList(regionAFN));
396
397      // Lets get the primary and tertiary from parent for regionB
398      Set<ServerName> regionBFN =
399          getInheritedFNForDaughter(helper, parentFavoredNodes, PRIMARY, TERTIARY);
400      result.put(regionB, Lists.newArrayList(regionBFN));
401    }
402
403    fnm.updateFavoredNodes(result);
404  }
405
406  private Set<ServerName> getInheritedFNForDaughter(FavoredNodeAssignmentHelper helper,
407      List<ServerName> parentFavoredNodes, Position primary, Position secondary)
408      throws IOException {
409
410    Set<ServerName> daughterFN = Sets.newLinkedHashSet();
411    if (parentFavoredNodes.size() >= primary.ordinal()) {
412      daughterFN.add(parentFavoredNodes.get(primary.ordinal()));
413    }
414
415    if (parentFavoredNodes.size() >= secondary.ordinal()) {
416      daughterFN.add(parentFavoredNodes.get(secondary.ordinal()));
417    }
418
419    while (daughterFN.size() < FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) {
420      ServerName newNode = helper.generateMissingFavoredNode(Lists.newArrayList(daughterFN));
421      daughterFN.add(newNode);
422    }
423    return daughterFN;
424  }
425
426  /*
427   * Generate favored nodes for a region during merge. Choose the FN from one of the sources to
428   * keep it simple.
429   */
430  @Override
431  public void generateFavoredNodesForMergedRegion(RegionInfo merged, RegionInfo [] mergeParents)
432      throws IOException {
433    Map<RegionInfo, List<ServerName>> regionFNMap = Maps.newHashMap();
434    regionFNMap.put(merged, getFavoredNodes(mergeParents[0]));
435    fnm.updateFavoredNodes(regionFNMap);
436  }
437
438  @Override
439  public List<RegionPlan> balanceCluster(TableName tableName,
440      Map<ServerName, List<RegionInfo>> clusterState) throws HBaseIOException {
441    return balanceCluster(clusterState);
442  }
443}