001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.favored;
019
020import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.PRIMARY;
021import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.SECONDARY;
022import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.TERTIARY;
023
024import edu.umd.cs.findbugs.annotations.NonNull;
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import org.apache.hadoop.hbase.HBaseIOException;
032import org.apache.hadoop.hbase.HBaseInterfaceAudience;
033import org.apache.hadoop.hbase.ServerMetrics;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position;
038import org.apache.hadoop.hbase.master.RegionPlan;
039import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
040import org.apache.hadoop.hbase.util.Pair;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
046import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
047import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
048
049/**
050 * An implementation of the {@link org.apache.hadoop.hbase.master.LoadBalancer} that assigns favored
051 * nodes for each region. There is a Primary RegionServer that hosts the region, and then there is
052 * Secondary and Tertiary RegionServers. Currently, the favored nodes information is used in
053 * creating HDFS files - the Primary RegionServer passes the primary, secondary, tertiary node
054 * addresses as hints to the DistributedFileSystem API for creating files on the filesystem. These
055 * nodes are treated as hints by the HDFS to place the blocks of the file. This alleviates the
056 * problem to do with reading from remote nodes (since we can make the Secondary RegionServer as the
057 * new Primary RegionServer) after a region is recovered. This should help provide consistent read
058 * latencies for the regions even when their primary region servers die.
059 */
060@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
061public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements FavoredNodesPromoter {
062  private static final Logger LOG = LoggerFactory.getLogger(FavoredNodeLoadBalancer.class);
063
064  private FavoredNodesManager fnm;
065
066  @Override
067  public void setFavoredNodesManager(FavoredNodesManager fnm) {
068    this.fnm = fnm;
069  }
070
071  @Override
072  protected List<RegionPlan> balanceTable(TableName tableName,
073    Map<ServerName, List<RegionInfo>> loadOfOneTable) {
074    // TODO. Look at is whether Stochastic loadbalancer can be integrated with this
075    List<RegionPlan> plans = new ArrayList<>();
076    Map<ServerName, ServerName> serverNameWithoutCodeToServerName = new HashMap<>();
077    for (ServerName sn : provider.getOnlineServersList()) {
078      ServerName s = ServerName.valueOf(sn.getHostname(), sn.getPort(), ServerName.NON_STARTCODE);
079      // FindBugs complains about useless store! serverNameToServerNameWithoutCode.put(sn, s);
080      serverNameWithoutCodeToServerName.put(s, sn);
081    }
082    for (Map.Entry<ServerName, List<RegionInfo>> entry : loadOfOneTable.entrySet()) {
083      ServerName currentServer = entry.getKey();
084      // get a server without the startcode for the currentServer
085      ServerName currentServerWithoutStartCode = ServerName.valueOf(currentServer.getHostname(),
086        currentServer.getPort(), ServerName.NON_STARTCODE);
087      List<RegionInfo> list = entry.getValue();
088      for (RegionInfo region : list) {
089        if (!FavoredNodesManager.isFavoredNodeApplicable(region)) {
090          continue;
091        }
092        List<ServerName> favoredNodes = fnm.getFavoredNodes(region);
093        if (favoredNodes == null || favoredNodes.get(0).equals(currentServerWithoutStartCode)) {
094          continue; // either favorednodes does not exist or we are already on the primary node
095        }
096        // check whether the primary is available
097        ServerName destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(0));
098        if (destination == null) {
099          // check whether the region is on secondary/tertiary
100          if (
101            currentServerWithoutStartCode.equals(favoredNodes.get(1))
102              || currentServerWithoutStartCode.equals(favoredNodes.get(2))
103          ) {
104            continue;
105          }
106          // the region is currently on none of the favored nodes
107          // get it on one of them if possible
108          ServerMetrics l1 =
109            provider.getLoad(serverNameWithoutCodeToServerName.get(favoredNodes.get(1)));
110          ServerMetrics l2 =
111            provider.getLoad(serverNameWithoutCodeToServerName.get(favoredNodes.get(2)));
112          if (l1 != null && l2 != null) {
113            if (l1.getRegionMetrics().size() > l2.getRegionMetrics().size()) {
114              destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(2));
115            } else {
116              destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(1));
117            }
118          } else if (l1 != null) {
119            destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(1));
120          } else if (l2 != null) {
121            destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(2));
122          }
123        }
124
125        if (destination != null) {
126          RegionPlan plan = new RegionPlan(region, currentServer, destination);
127          plans.add(plan);
128        }
129      }
130    }
131    return plans;
132  }
133
134  @Override
135  @NonNull
136  public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions,
137    List<ServerName> servers) throws HBaseIOException {
138    Map<ServerName, List<RegionInfo>> assignmentMap;
139    try {
140      FavoredNodeAssignmentHelper assignmentHelper =
141        new FavoredNodeAssignmentHelper(servers, rackManager);
142      assignmentHelper.initialize();
143      if (!assignmentHelper.canPlaceFavoredNodes()) {
144        return super.roundRobinAssignment(regions, servers);
145      }
146      // Segregate the regions into two types:
147      // 1. The regions that have favored node assignment, and where at least
148      // one of the favored node is still alive. In this case, try to adhere
149      // to the current favored nodes assignment as much as possible - i.e.,
150      // if the current primary is gone, then make the secondary or tertiary
151      // as the new host for the region (based on their current load).
152      // Note that we don't change the favored
153      // node assignments here (even though one or more favored node is currently
154      // down). It is up to the balanceCluster to do this hard work. The HDFS
155      // can handle the fact that some nodes in the favored nodes hint is down
156      // It'd allocate some other DNs. In combination with stale settings for HDFS,
157      // we should be just fine.
158      // 2. The regions that currently don't have favored node assignment. We will
159      // need to come up with favored nodes assignments for them. The corner case
160      // in (1) above is that all the nodes are unavailable and in that case, we
161      // will note that this region doesn't have favored nodes.
162      Pair<Map<ServerName, List<RegionInfo>>, List<RegionInfo>> segregatedRegions =
163        segregateRegionsAndAssignRegionsWithFavoredNodes(regions, servers);
164      Map<ServerName, List<RegionInfo>> regionsWithFavoredNodesMap = segregatedRegions.getFirst();
165      List<RegionInfo> regionsWithNoFavoredNodes = segregatedRegions.getSecond();
166      assignmentMap = new HashMap<>();
167      roundRobinAssignmentImpl(assignmentHelper, assignmentMap, regionsWithNoFavoredNodes);
168      // merge the assignment maps
169      assignmentMap.putAll(regionsWithFavoredNodesMap);
170    } catch (Exception ex) {
171      LOG.warn("Encountered exception while doing favored-nodes assignment " + ex
172        + " Falling back to regular assignment");
173      assignmentMap = super.roundRobinAssignment(regions, servers);
174    }
175    return assignmentMap;
176  }
177
178  @Override
179  public ServerName randomAssignment(RegionInfo regionInfo, List<ServerName> servers)
180    throws HBaseIOException {
181    try {
182      FavoredNodeAssignmentHelper assignmentHelper =
183        new FavoredNodeAssignmentHelper(servers, rackManager);
184      assignmentHelper.initialize();
185      ServerName primary = super.randomAssignment(regionInfo, servers);
186      if (
187        !FavoredNodesManager.isFavoredNodeApplicable(regionInfo)
188          || !assignmentHelper.canPlaceFavoredNodes()
189      ) {
190        return primary;
191      }
192      List<ServerName> favoredNodes = fnm.getFavoredNodes(regionInfo);
193      // check if we have a favored nodes mapping for this region and if so, return
194      // a server from the favored nodes list if the passed 'servers' contains this
195      // server as well (available servers, that is)
196      if (favoredNodes != null) {
197        for (ServerName s : favoredNodes) {
198          ServerName serverWithLegitStartCode = availableServersContains(servers, s);
199          if (serverWithLegitStartCode != null) {
200            return serverWithLegitStartCode;
201          }
202        }
203      }
204      List<RegionInfo> regions = new ArrayList<>(1);
205      regions.add(regionInfo);
206      Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>(1);
207      primaryRSMap.put(regionInfo, primary);
208      assignSecondaryAndTertiaryNodesForRegion(assignmentHelper, regions, primaryRSMap);
209      return primary;
210    } catch (Exception ex) {
211      LOG.warn("Encountered exception while doing favored-nodes (random)assignment " + ex
212        + " Falling back to regular assignment");
213      return super.randomAssignment(regionInfo, servers);
214    }
215  }
216
217  private Pair<Map<ServerName, List<RegionInfo>>, List<RegionInfo>>
218    segregateRegionsAndAssignRegionsWithFavoredNodes(List<RegionInfo> regions,
219      List<ServerName> availableServers) {
220    Map<ServerName, List<RegionInfo>> assignmentMapForFavoredNodes =
221      new HashMap<>(regions.size() / 2);
222    List<RegionInfo> regionsWithNoFavoredNodes = new ArrayList<>(regions.size() / 2);
223    for (RegionInfo region : regions) {
224      List<ServerName> favoredNodes = fnm.getFavoredNodes(region);
225      ServerName primaryHost = null;
226      ServerName secondaryHost = null;
227      ServerName tertiaryHost = null;
228      if (favoredNodes != null) {
229        for (ServerName s : favoredNodes) {
230          ServerName serverWithLegitStartCode = availableServersContains(availableServers, s);
231          if (serverWithLegitStartCode != null) {
232            FavoredNodesPlan.Position position =
233              FavoredNodesPlan.getFavoredServerPosition(favoredNodes, s);
234            if (Position.PRIMARY.equals(position)) {
235              primaryHost = serverWithLegitStartCode;
236            } else if (Position.SECONDARY.equals(position)) {
237              secondaryHost = serverWithLegitStartCode;
238            } else if (Position.TERTIARY.equals(position)) {
239              tertiaryHost = serverWithLegitStartCode;
240            }
241          }
242        }
243        assignRegionToAvailableFavoredNode(assignmentMapForFavoredNodes, region, primaryHost,
244          secondaryHost, tertiaryHost);
245      }
246      if (primaryHost == null && secondaryHost == null && tertiaryHost == null) {
247        // all favored nodes unavailable
248        regionsWithNoFavoredNodes.add(region);
249      }
250    }
251    return new Pair<>(assignmentMapForFavoredNodes, regionsWithNoFavoredNodes);
252  }
253
254  // Do a check of the hostname and port and return the servername from the servers list
255  // that matched (the favoredNode will have a startcode of -1 but we want the real
256  // server with the legit startcode
257  private ServerName availableServersContains(List<ServerName> servers, ServerName favoredNode) {
258    for (ServerName server : servers) {
259      if (ServerName.isSameAddress(favoredNode, server)) {
260        return server;
261      }
262    }
263    return null;
264  }
265
266  private void assignRegionToAvailableFavoredNode(
267    Map<ServerName, List<RegionInfo>> assignmentMapForFavoredNodes, RegionInfo region,
268    ServerName primaryHost, ServerName secondaryHost, ServerName tertiaryHost) {
269    if (primaryHost != null) {
270      addRegionToMap(assignmentMapForFavoredNodes, region, primaryHost);
271    } else if (secondaryHost != null && tertiaryHost != null) {
272      // assign the region to the one with a lower load
273      // (both have the desired hdfs blocks)
274      ServerName s;
275      ServerMetrics tertiaryLoad = provider.getLoad(tertiaryHost);
276      ServerMetrics secondaryLoad = provider.getLoad(secondaryHost);
277      if (secondaryLoad.getRegionMetrics().size() < tertiaryLoad.getRegionMetrics().size()) {
278        s = secondaryHost;
279      } else {
280        s = tertiaryHost;
281      }
282      addRegionToMap(assignmentMapForFavoredNodes, region, s);
283    } else if (secondaryHost != null) {
284      addRegionToMap(assignmentMapForFavoredNodes, region, secondaryHost);
285    } else if (tertiaryHost != null) {
286      addRegionToMap(assignmentMapForFavoredNodes, region, tertiaryHost);
287    }
288  }
289
290  private void addRegionToMap(Map<ServerName, List<RegionInfo>> assignmentMapForFavoredNodes,
291    RegionInfo region, ServerName host) {
292    List<RegionInfo> regionsOnServer = assignmentMapForFavoredNodes.get(host);
293    if (regionsOnServer == null) {
294      regionsOnServer = new ArrayList<>();
295      assignmentMapForFavoredNodes.put(host, regionsOnServer);
296    }
297    regionsOnServer.add(region);
298  }
299
300  @Override
301  public List<ServerName> getFavoredNodes(RegionInfo regionInfo) {
302    return this.fnm.getFavoredNodes(regionInfo);
303  }
304
305  private void roundRobinAssignmentImpl(FavoredNodeAssignmentHelper assignmentHelper,
306    Map<ServerName, List<RegionInfo>> assignmentMap, List<RegionInfo> regions) throws IOException {
307    Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>();
308    // figure the primary RSs
309    assignmentHelper.placePrimaryRSAsRoundRobin(assignmentMap, primaryRSMap, regions);
310    assignSecondaryAndTertiaryNodesForRegion(assignmentHelper, regions, primaryRSMap);
311  }
312
313  private void assignSecondaryAndTertiaryNodesForRegion(
314    FavoredNodeAssignmentHelper assignmentHelper, List<RegionInfo> regions,
315    Map<RegionInfo, ServerName> primaryRSMap) throws IOException {
316    // figure the secondary and tertiary RSs
317    Map<RegionInfo, ServerName[]> secondaryAndTertiaryRSMap =
318      assignmentHelper.placeSecondaryAndTertiaryRS(primaryRSMap);
319
320    Map<RegionInfo, List<ServerName>> regionFNMap = Maps.newHashMap();
321    // now record all the assignments so that we can serve queries later
322    for (RegionInfo region : regions) {
323      // Store the favored nodes without startCode for the ServerName objects
324      // We don't care about the startcode; but only the hostname really
325      List<ServerName> favoredNodesForRegion = new ArrayList<>(3);
326      ServerName sn = primaryRSMap.get(region);
327      favoredNodesForRegion
328        .add(ServerName.valueOf(sn.getHostname(), sn.getPort(), ServerName.NON_STARTCODE));
329      ServerName[] secondaryAndTertiaryNodes = secondaryAndTertiaryRSMap.get(region);
330      if (secondaryAndTertiaryNodes != null) {
331        favoredNodesForRegion.add(ServerName.valueOf(secondaryAndTertiaryNodes[0].getHostname(),
332          secondaryAndTertiaryNodes[0].getPort(), ServerName.NON_STARTCODE));
333        favoredNodesForRegion.add(ServerName.valueOf(secondaryAndTertiaryNodes[1].getHostname(),
334          secondaryAndTertiaryNodes[1].getPort(), ServerName.NON_STARTCODE));
335      }
336      regionFNMap.put(region, favoredNodesForRegion);
337    }
338    fnm.updateFavoredNodes(regionFNMap);
339  }
340
341  /*
342   * Generate Favored Nodes for daughters during region split. If the parent does not have FN,
343   * regenerates them for the daughters. If the parent has FN, inherit two FN from parent for each
344   * daughter and generate the remaining. The primary FN for both the daughters should be the same
345   * as parent. Inherit the secondary FN from the parent but keep it different for each daughter.
346   * Choose the remaining FN randomly. This would give us better distribution over a period of time
347   * after enough splits.
348   */
349  @Override
350  public void generateFavoredNodesForDaughter(List<ServerName> servers, RegionInfo parent,
351    RegionInfo regionA, RegionInfo regionB) throws IOException {
352
353    Map<RegionInfo, List<ServerName>> result = new HashMap<>();
354    FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, rackManager);
355    helper.initialize();
356
357    List<ServerName> parentFavoredNodes = getFavoredNodes(parent);
358    if (parentFavoredNodes == null) {
359      LOG.debug("Unable to find favored nodes for parent, " + parent
360        + " generating new favored nodes for daughter");
361      result.put(regionA, helper.generateFavoredNodes(regionA));
362      result.put(regionB, helper.generateFavoredNodes(regionB));
363
364    } else {
365
366      // Lets get the primary and secondary from parent for regionA
367      Set<ServerName> regionAFN =
368        getInheritedFNForDaughter(helper, parentFavoredNodes, PRIMARY, SECONDARY);
369      result.put(regionA, Lists.newArrayList(regionAFN));
370
371      // Lets get the primary and tertiary from parent for regionB
372      Set<ServerName> regionBFN =
373        getInheritedFNForDaughter(helper, parentFavoredNodes, PRIMARY, TERTIARY);
374      result.put(regionB, Lists.newArrayList(regionBFN));
375    }
376
377    fnm.updateFavoredNodes(result);
378  }
379
380  private Set<ServerName> getInheritedFNForDaughter(FavoredNodeAssignmentHelper helper,
381    List<ServerName> parentFavoredNodes, Position primary, Position secondary) throws IOException {
382
383    Set<ServerName> daughterFN = Sets.newLinkedHashSet();
384    if (parentFavoredNodes.size() >= primary.ordinal()) {
385      daughterFN.add(parentFavoredNodes.get(primary.ordinal()));
386    }
387
388    if (parentFavoredNodes.size() >= secondary.ordinal()) {
389      daughterFN.add(parentFavoredNodes.get(secondary.ordinal()));
390    }
391
392    while (daughterFN.size() < FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) {
393      ServerName newNode = helper.generateMissingFavoredNode(Lists.newArrayList(daughterFN));
394      daughterFN.add(newNode);
395    }
396    return daughterFN;
397  }
398
399  /*
400   * Generate favored nodes for a region during merge. Choose the FN from one of the sources to keep
401   * it simple.
402   */
403  @Override
404  public void generateFavoredNodesForMergedRegion(RegionInfo merged, RegionInfo[] mergeParents)
405    throws IOException {
406    Map<RegionInfo, List<ServerName>> regionFNMap = Maps.newHashMap();
407    regionFNMap.put(merged, getFavoredNodes(mergeParents[0]));
408    fnm.updateFavoredNodes(regionFNMap);
409  }
410
411}