001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.favored;
020
021import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.PRIMARY;
022import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.SECONDARY;
023import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.TERTIARY;
024
025import edu.umd.cs.findbugs.annotations.NonNull;
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.HBaseIOException;
035import org.apache.hadoop.hbase.HBaseInterfaceAudience;
036import org.apache.hadoop.hbase.ServerMetrics;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position;
041import org.apache.hadoop.hbase.master.RackManager;
042import org.apache.hadoop.hbase.master.RegionPlan;
043import org.apache.hadoop.hbase.master.ServerManager;
044import org.apache.hadoop.hbase.master.SnapshotOfRegionAssignmentFromMeta;
045import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
046import org.apache.hadoop.hbase.util.Pair;
047import org.apache.yetus.audience.InterfaceAudience;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
052import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
053import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
054
055/**
056 * An implementation of the {@link org.apache.hadoop.hbase.master.LoadBalancer} that
057 * assigns favored nodes for each region. There is a Primary RegionServer that hosts
058 * the region, and then there is Secondary and Tertiary RegionServers. Currently, the
059 * favored nodes information is used in creating HDFS files - the Primary RegionServer
060 * passes the primary, secondary, tertiary node addresses as hints to the
061 * DistributedFileSystem API for creating files on the filesystem. These nodes are
062 * treated as hints by the HDFS to place the blocks of the file. This alleviates the
063 * problem to do with reading from remote nodes (since we can make the Secondary
064 * RegionServer as the new Primary RegionServer) after a region is recovered. This
065 * should help provide consistent read latencies for the regions even when their
066 * primary region servers die.
067 *
068 */
069@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
070public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements FavoredNodesPromoter {
071  private static final Logger LOG = LoggerFactory.getLogger(FavoredNodeLoadBalancer.class);
072
073  private RackManager rackManager;
074  private Configuration conf;
075  private FavoredNodesManager fnm;
076
077  @Override
078  public void setConf(Configuration conf) {
079    this.conf = conf;
080  }
081
082  @Override
083  public synchronized void initialize() throws HBaseIOException {
084    super.initialize();
085    super.setConf(conf);
086    this.fnm = services.getFavoredNodesManager();
087    this.rackManager = new RackManager(conf);
088    super.setConf(conf);
089  }
090
091  @Override
092  public List<RegionPlan> balanceTable(TableName tableName,
093      Map<ServerName, List<RegionInfo>> loadOfOneTable) {
094    // TODO. Look at is whether Stochastic loadbalancer can be integrated with this
095    List<RegionPlan> plans = new ArrayList<>();
096    // perform a scan of the meta to get the latest updates (if any)
097    SnapshotOfRegionAssignmentFromMeta snaphotOfRegionAssignment =
098        new SnapshotOfRegionAssignmentFromMeta(super.services.getConnection());
099    try {
100      snaphotOfRegionAssignment.initialize();
101    } catch (IOException ie) {
102      LOG.warn("Not running balancer since exception was thrown " + ie);
103      return plans;
104    }
105    // This is not used? Findbugs says so: Map<ServerName, ServerName>
106    // serverNameToServerNameWithoutCode = new HashMap<>();
107    Map<ServerName, ServerName> serverNameWithoutCodeToServerName = new HashMap<>();
108    ServerManager serverMgr = super.services.getServerManager();
109    for (ServerName sn : serverMgr.getOnlineServersList()) {
110      ServerName s = ServerName.valueOf(sn.getHostname(), sn.getPort(), ServerName.NON_STARTCODE);
111      // FindBugs complains about useless store! serverNameToServerNameWithoutCode.put(sn, s);
112      serverNameWithoutCodeToServerName.put(s, sn);
113    }
114    for (Map.Entry<ServerName, List<RegionInfo>> entry : loadOfOneTable.entrySet()) {
115      ServerName currentServer = entry.getKey();
116      // get a server without the startcode for the currentServer
117      ServerName currentServerWithoutStartCode = ServerName.valueOf(currentServer.getHostname(),
118        currentServer.getPort(), ServerName.NON_STARTCODE);
119      List<RegionInfo> list = entry.getValue();
120      for (RegionInfo region : list) {
121        if (!FavoredNodesManager.isFavoredNodeApplicable(region)) {
122          continue;
123        }
124        List<ServerName> favoredNodes = fnm.getFavoredNodes(region);
125        if (favoredNodes == null || favoredNodes.get(0).equals(currentServerWithoutStartCode)) {
126          continue; // either favorednodes does not exist or we are already on the primary node
127        }
128        ServerName destination = null;
129        // check whether the primary is available
130        destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(0));
131        if (destination == null) {
132          // check whether the region is on secondary/tertiary
133          if (currentServerWithoutStartCode.equals(favoredNodes.get(1))
134              || currentServerWithoutStartCode.equals(favoredNodes.get(2))) {
135            continue;
136          }
137          // the region is currently on none of the favored nodes
138          // get it on one of them if possible
139          ServerMetrics l1 = super.services.getServerManager()
140              .getLoad(serverNameWithoutCodeToServerName.get(favoredNodes.get(1)));
141          ServerMetrics l2 = super.services.getServerManager()
142              .getLoad(serverNameWithoutCodeToServerName.get(favoredNodes.get(2)));
143          if (l1 != null && l2 != null) {
144            if (l1.getRegionMetrics().size() > l2.getRegionMetrics().size()) {
145              destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(2));
146            } else {
147              destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(1));
148            }
149          } else if (l1 != null) {
150            destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(1));
151          } else if (l2 != null) {
152            destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(2));
153          }
154        }
155
156        if (destination != null) {
157          RegionPlan plan = new RegionPlan(region, currentServer, destination);
158          plans.add(plan);
159        }
160      }
161    }
162    return plans;
163  }
164
165  @Override
166  @NonNull
167  public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions,
168      List<ServerName> servers) throws HBaseIOException {
169    Map<ServerName, List<RegionInfo>> assignmentMap;
170    try {
171      FavoredNodeAssignmentHelper assignmentHelper =
172          new FavoredNodeAssignmentHelper(servers, rackManager);
173      assignmentHelper.initialize();
174      if (!assignmentHelper.canPlaceFavoredNodes()) {
175        return super.roundRobinAssignment(regions, servers);
176      }
177      // Segregate the regions into two types:
178      // 1. The regions that have favored node assignment, and where at least
179      //    one of the favored node is still alive. In this case, try to adhere
180      //    to the current favored nodes assignment as much as possible - i.e.,
181      //    if the current primary is gone, then make the secondary or tertiary
182      //    as the new host for the region (based on their current load).
183      //    Note that we don't change the favored
184      //    node assignments here (even though one or more favored node is currently
185      //    down). It is up to the balanceCluster to do this hard work. The HDFS
186      //    can handle the fact that some nodes in the favored nodes hint is down
187      //    It'd allocate some other DNs. In combination with stale settings for HDFS,
188      //    we should be just fine.
189      // 2. The regions that currently don't have favored node assignment. We will
190      //    need to come up with favored nodes assignments for them. The corner case
191      //    in (1) above is that all the nodes are unavailable and in that case, we
192      //    will note that this region doesn't have favored nodes.
193      Pair<Map<ServerName,List<RegionInfo>>, List<RegionInfo>> segregatedRegions =
194          segregateRegionsAndAssignRegionsWithFavoredNodes(regions, servers);
195      Map<ServerName,List<RegionInfo>> regionsWithFavoredNodesMap = segregatedRegions.getFirst();
196      List<RegionInfo> regionsWithNoFavoredNodes = segregatedRegions.getSecond();
197      assignmentMap = new HashMap<>();
198      roundRobinAssignmentImpl(assignmentHelper, assignmentMap, regionsWithNoFavoredNodes,
199          servers);
200      // merge the assignment maps
201      assignmentMap.putAll(regionsWithFavoredNodesMap);
202    } catch (Exception ex) {
203      LOG.warn("Encountered exception while doing favored-nodes assignment " + ex +
204          " Falling back to regular assignment");
205      assignmentMap = super.roundRobinAssignment(regions, servers);
206    }
207    return assignmentMap;
208  }
209
210  @Override
211  public ServerName randomAssignment(RegionInfo regionInfo, List<ServerName> servers)
212      throws HBaseIOException {
213    try {
214      FavoredNodeAssignmentHelper assignmentHelper =
215          new FavoredNodeAssignmentHelper(servers, rackManager);
216      assignmentHelper.initialize();
217      ServerName primary = super.randomAssignment(regionInfo, servers);
218      if (!FavoredNodesManager.isFavoredNodeApplicable(regionInfo)
219          || !assignmentHelper.canPlaceFavoredNodes()) {
220        return primary;
221      }
222      List<ServerName> favoredNodes = fnm.getFavoredNodes(regionInfo);
223      // check if we have a favored nodes mapping for this region and if so, return
224      // a server from the favored nodes list if the passed 'servers' contains this
225      // server as well (available servers, that is)
226      if (favoredNodes != null) {
227        for (ServerName s : favoredNodes) {
228          ServerName serverWithLegitStartCode = availableServersContains(servers, s);
229          if (serverWithLegitStartCode != null) {
230            return serverWithLegitStartCode;
231          }
232        }
233      }
234      List<RegionInfo> regions = new ArrayList<>(1);
235      regions.add(regionInfo);
236      Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>(1);
237      primaryRSMap.put(regionInfo, primary);
238      assignSecondaryAndTertiaryNodesForRegion(assignmentHelper, regions, primaryRSMap);
239      return primary;
240    } catch (Exception ex) {
241      LOG.warn("Encountered exception while doing favored-nodes (random)assignment " + ex +
242          " Falling back to regular assignment");
243      return super.randomAssignment(regionInfo, servers);
244    }
245  }
246
247  private Pair<Map<ServerName, List<RegionInfo>>, List<RegionInfo>>
248  segregateRegionsAndAssignRegionsWithFavoredNodes(List<RegionInfo> regions,
249      List<ServerName> availableServers) {
250    Map<ServerName, List<RegionInfo>> assignmentMapForFavoredNodes = new HashMap<>(regions.size() / 2);
251    List<RegionInfo> regionsWithNoFavoredNodes = new ArrayList<>(regions.size()/2);
252    for (RegionInfo region : regions) {
253      List<ServerName> favoredNodes = fnm.getFavoredNodes(region);
254      ServerName primaryHost = null;
255      ServerName secondaryHost = null;
256      ServerName tertiaryHost = null;
257      if (favoredNodes != null) {
258        for (ServerName s : favoredNodes) {
259          ServerName serverWithLegitStartCode = availableServersContains(availableServers, s);
260          if (serverWithLegitStartCode != null) {
261            FavoredNodesPlan.Position position =
262                FavoredNodesPlan.getFavoredServerPosition(favoredNodes, s);
263            if (Position.PRIMARY.equals(position)) {
264              primaryHost = serverWithLegitStartCode;
265            } else if (Position.SECONDARY.equals(position)) {
266              secondaryHost = serverWithLegitStartCode;
267            } else if (Position.TERTIARY.equals(position)) {
268              tertiaryHost = serverWithLegitStartCode;
269            }
270          }
271        }
272        assignRegionToAvailableFavoredNode(assignmentMapForFavoredNodes, region,
273              primaryHost, secondaryHost, tertiaryHost);
274      }
275      if (primaryHost == null && secondaryHost == null && tertiaryHost == null) {
276        //all favored nodes unavailable
277        regionsWithNoFavoredNodes.add(region);
278      }
279    }
280    return new Pair<>(assignmentMapForFavoredNodes, regionsWithNoFavoredNodes);
281  }
282
283  // Do a check of the hostname and port and return the servername from the servers list
284  // that matched (the favoredNode will have a startcode of -1 but we want the real
285  // server with the legit startcode
286  private ServerName availableServersContains(List<ServerName> servers, ServerName favoredNode) {
287    for (ServerName server : servers) {
288      if (ServerName.isSameAddress(favoredNode, server)) {
289        return server;
290      }
291    }
292    return null;
293  }
294
295  private void assignRegionToAvailableFavoredNode(Map<ServerName,
296      List<RegionInfo>> assignmentMapForFavoredNodes, RegionInfo region, ServerName primaryHost,
297      ServerName secondaryHost, ServerName tertiaryHost) {
298    if (primaryHost != null) {
299      addRegionToMap(assignmentMapForFavoredNodes, region, primaryHost);
300    } else if (secondaryHost != null && tertiaryHost != null) {
301      // assign the region to the one with a lower load
302      // (both have the desired hdfs blocks)
303      ServerName s;
304      ServerMetrics tertiaryLoad = super.services.getServerManager().getLoad(tertiaryHost);
305      ServerMetrics secondaryLoad = super.services.getServerManager().getLoad(secondaryHost);
306      if (secondaryLoad.getRegionMetrics().size() < tertiaryLoad.getRegionMetrics().size()) {
307        s = secondaryHost;
308      } else {
309        s = tertiaryHost;
310      }
311      addRegionToMap(assignmentMapForFavoredNodes, region, s);
312    } else if (secondaryHost != null) {
313      addRegionToMap(assignmentMapForFavoredNodes, region, secondaryHost);
314    } else if (tertiaryHost != null) {
315      addRegionToMap(assignmentMapForFavoredNodes, region, tertiaryHost);
316    }
317  }
318
319  private void addRegionToMap(Map<ServerName, List<RegionInfo>> assignmentMapForFavoredNodes,
320      RegionInfo region, ServerName host) {
321    List<RegionInfo> regionsOnServer = null;
322    if ((regionsOnServer = assignmentMapForFavoredNodes.get(host)) == null) {
323      regionsOnServer = new ArrayList<>();
324      assignmentMapForFavoredNodes.put(host, regionsOnServer);
325    }
326    regionsOnServer.add(region);
327  }
328
329  public synchronized List<ServerName> getFavoredNodes(RegionInfo regionInfo) {
330    return this.fnm.getFavoredNodes(regionInfo);
331  }
332
333  private void roundRobinAssignmentImpl(FavoredNodeAssignmentHelper assignmentHelper,
334      Map<ServerName, List<RegionInfo>> assignmentMap,
335      List<RegionInfo> regions, List<ServerName> servers) throws IOException {
336    Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>();
337    // figure the primary RSs
338    assignmentHelper.placePrimaryRSAsRoundRobin(assignmentMap, primaryRSMap, regions);
339    assignSecondaryAndTertiaryNodesForRegion(assignmentHelper, regions, primaryRSMap);
340  }
341
342  private void assignSecondaryAndTertiaryNodesForRegion(
343      FavoredNodeAssignmentHelper assignmentHelper,
344      List<RegionInfo> regions, Map<RegionInfo, ServerName> primaryRSMap) throws IOException {
345    // figure the secondary and tertiary RSs
346    Map<RegionInfo, ServerName[]> secondaryAndTertiaryRSMap =
347        assignmentHelper.placeSecondaryAndTertiaryRS(primaryRSMap);
348
349    Map<RegionInfo, List<ServerName>> regionFNMap = Maps.newHashMap();
350    // now record all the assignments so that we can serve queries later
351    for (RegionInfo region : regions) {
352      // Store the favored nodes without startCode for the ServerName objects
353      // We don't care about the startcode; but only the hostname really
354      List<ServerName> favoredNodesForRegion = new ArrayList<>(3);
355      ServerName sn = primaryRSMap.get(region);
356      favoredNodesForRegion.add(ServerName.valueOf(sn.getHostname(), sn.getPort(),
357          ServerName.NON_STARTCODE));
358      ServerName[] secondaryAndTertiaryNodes = secondaryAndTertiaryRSMap.get(region);
359      if (secondaryAndTertiaryNodes != null) {
360        favoredNodesForRegion.add(ServerName.valueOf(secondaryAndTertiaryNodes[0].getHostname(),
361            secondaryAndTertiaryNodes[0].getPort(), ServerName.NON_STARTCODE));
362        favoredNodesForRegion.add(ServerName.valueOf(secondaryAndTertiaryNodes[1].getHostname(),
363            secondaryAndTertiaryNodes[1].getPort(), ServerName.NON_STARTCODE));
364      }
365      regionFNMap.put(region, favoredNodesForRegion);
366    }
367    fnm.updateFavoredNodes(regionFNMap);
368  }
369
370  /*
371   * Generate Favored Nodes for daughters during region split.
372   *
373   * If the parent does not have FN, regenerates them for the daughters.
374   *
375   * If the parent has FN, inherit two FN from parent for each daughter and generate the remaining.
376   * The primary FN for both the daughters should be the same as parent. Inherit the secondary
377   * FN from the parent but keep it different for each daughter. Choose the remaining FN
378   * randomly. This would give us better distribution over a period of time after enough splits.
379   */
380  @Override
381  public void generateFavoredNodesForDaughter(List<ServerName> servers, RegionInfo parent,
382      RegionInfo regionA, RegionInfo regionB) throws IOException {
383
384    Map<RegionInfo, List<ServerName>> result = new HashMap<>();
385    FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, rackManager);
386    helper.initialize();
387
388    List<ServerName> parentFavoredNodes = getFavoredNodes(parent);
389    if (parentFavoredNodes == null) {
390      LOG.debug("Unable to find favored nodes for parent, " + parent
391          + " generating new favored nodes for daughter");
392      result.put(regionA, helper.generateFavoredNodes(regionA));
393      result.put(regionB, helper.generateFavoredNodes(regionB));
394
395    } else {
396
397      // Lets get the primary and secondary from parent for regionA
398      Set<ServerName> regionAFN =
399          getInheritedFNForDaughter(helper, parentFavoredNodes, PRIMARY, SECONDARY);
400      result.put(regionA, Lists.newArrayList(regionAFN));
401
402      // Lets get the primary and tertiary from parent for regionB
403      Set<ServerName> regionBFN =
404          getInheritedFNForDaughter(helper, parentFavoredNodes, PRIMARY, TERTIARY);
405      result.put(regionB, Lists.newArrayList(regionBFN));
406    }
407
408    fnm.updateFavoredNodes(result);
409  }
410
411  private Set<ServerName> getInheritedFNForDaughter(FavoredNodeAssignmentHelper helper,
412      List<ServerName> parentFavoredNodes, Position primary, Position secondary)
413      throws IOException {
414
415    Set<ServerName> daughterFN = Sets.newLinkedHashSet();
416    if (parentFavoredNodes.size() >= primary.ordinal()) {
417      daughterFN.add(parentFavoredNodes.get(primary.ordinal()));
418    }
419
420    if (parentFavoredNodes.size() >= secondary.ordinal()) {
421      daughterFN.add(parentFavoredNodes.get(secondary.ordinal()));
422    }
423
424    while (daughterFN.size() < FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) {
425      ServerName newNode = helper.generateMissingFavoredNode(Lists.newArrayList(daughterFN));
426      daughterFN.add(newNode);
427    }
428    return daughterFN;
429  }
430
431  /*
432   * Generate favored nodes for a region during merge. Choose the FN from one of the sources to
433   * keep it simple.
434   */
435  @Override
436  public void generateFavoredNodesForMergedRegion(RegionInfo merged, RegionInfo [] mergeParents)
437      throws IOException {
438    Map<RegionInfo, List<ServerName>> regionFNMap = Maps.newHashMap();
439    regionFNMap.put(merged, getFavoredNodes(mergeParents[0]));
440    fnm.updateFavoredNodes(regionFNMap);
441  }
442
443}