001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import static org.apache.hadoop.hbase.ServerName.NON_STARTCODE;
021import static org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper.FAVORED_NODES_NUM;
022import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.PRIMARY;
023import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.SECONDARY;
024import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.TERTIARY;
025
026import edu.umd.cs.findbugs.annotations.NonNull;
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.Collection;
030import java.util.HashMap;
031import java.util.HashSet;
032import java.util.List;
033import java.util.Map;
034import java.util.Set;
035import java.util.concurrent.ThreadLocalRandom;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.HBaseIOException;
038import org.apache.hadoop.hbase.ServerMetrics;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.RegionInfo;
042import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper;
043import org.apache.hadoop.hbase.favored.FavoredNodesManager;
044import org.apache.hadoop.hbase.favored.FavoredNodesPlan;
045import org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position;
046import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
047import org.apache.hadoop.hbase.master.RegionPlan;
048import org.apache.hadoop.hbase.util.Pair;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
054import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
055import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
056
057/**
058 * An implementation of the {@link org.apache.hadoop.hbase.master.LoadBalancer} that assigns favored
059 * nodes for each region. There is a Primary RegionServer that hosts the region, and then there is
060 * Secondary and Tertiary RegionServers. Currently, the favored nodes information is used in
061 * creating HDFS files - the Primary RegionServer passes the primary, secondary, tertiary node
062 * addresses as hints to the DistributedFileSystem API for creating files on the filesystem. These
063 * nodes are treated as hints by the HDFS to place the blocks of the file. This alleviates the
064 * problem to do with reading from remote nodes (since we can make the Secondary RegionServer as the
065 * new Primary RegionServer) after a region is recovered. This should help provide consistent read
066 * latencies for the regions even when their primary region servers die. This provides two
067 * {@link CandidateGenerator}
068 */
069@InterfaceAudience.Private
070public class FavoredStochasticBalancer extends StochasticLoadBalancer
071  implements FavoredNodesPromoter {
072
073  private static final Logger LOG = LoggerFactory.getLogger(FavoredStochasticBalancer.class);
074
075  private FavoredNodesManager fnm;
076
077  @Override
078  public void setFavoredNodesManager(FavoredNodesManager fnm) {
079    this.fnm = fnm;
080  }
081
082  @Override
083  protected List<CandidateGenerator> createCandidateGenerators() {
084    List<CandidateGenerator> fnPickers = new ArrayList<>(2);
085    fnPickers.add(new FavoredNodeLoadPicker());
086    fnPickers.add(new FavoredNodeLocalityPicker());
087    return fnPickers;
088  }
089
090  /** Returns any candidate generator in random */
091  @Override
092  protected CandidateGenerator getRandomGenerator() {
093    return candidateGenerators.get(ThreadLocalRandom.current().nextInt(candidateGenerators.size()));
094  }
095
096  /**
097   * Round robin assignment: Segregate the regions into two types: 1. The regions that have favored
098   * node assignment where at least one of the favored node is still alive. In this case, try to
099   * adhere to the current favored nodes assignment as much as possible - i.e., if the current
100   * primary is gone, then make the secondary or tertiary as the new host for the region (based on
101   * their current load). Note that we don't change the favored node assignments here (even though
102   * one or more favored node is currently down). That will be done by the admin operations. 2. The
103   * regions that currently don't have favored node assignments. Generate favored nodes for them and
104   * then assign. Generate the primary fn in round robin fashion and generate secondary and tertiary
105   * as per favored nodes constraints.
106   */
107  @Override
108  @NonNull
109  public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions,
110    List<ServerName> servers) throws HBaseIOException {
111    metricsBalancer.incrMiscInvocations();
112    Map<ServerName, List<RegionInfo>> assignmentMap = new HashMap<>();
113    if (regions.isEmpty()) {
114      return assignmentMap;
115    }
116    Set<RegionInfo> regionSet = new HashSet<>(regions);
117    try {
118      FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, rackManager);
119      helper.initialize();
120
121      Set<RegionInfo> systemRegions = FavoredNodesManager.filterNonFNApplicableRegions(regionSet);
122      regionSet.removeAll(systemRegions);
123
124      // Assign all system regions
125      Map<ServerName, List<RegionInfo>> systemAssignments =
126        super.roundRobinAssignment(Lists.newArrayList(systemRegions), servers);
127
128      // Segregate favored and non-favored nodes regions and assign accordingly.
129      Pair<Map<ServerName, List<RegionInfo>>, List<RegionInfo>> segregatedRegions =
130        segregateRegionsAndAssignRegionsWithFavoredNodes(regionSet, servers);
131      Map<ServerName, List<RegionInfo>> regionsWithFavoredNodesMap = segregatedRegions.getFirst();
132      Map<ServerName, List<RegionInfo>> regionsWithoutFN =
133        generateFNForRegionsWithoutFN(helper, segregatedRegions.getSecond());
134
135      // merge the assignment maps
136      mergeAssignmentMaps(assignmentMap, systemAssignments);
137      mergeAssignmentMaps(assignmentMap, regionsWithFavoredNodesMap);
138      mergeAssignmentMaps(assignmentMap, regionsWithoutFN);
139
140    } catch (Exception ex) {
141      throw new HBaseIOException("Encountered exception while doing favored-nodes assignment " + ex
142        + " Falling back to regular assignment", ex);
143    }
144    return assignmentMap;
145  }
146
147  private void mergeAssignmentMaps(Map<ServerName, List<RegionInfo>> assignmentMap,
148    Map<ServerName, List<RegionInfo>> otherAssignments) {
149
150    if (otherAssignments == null || otherAssignments.isEmpty()) {
151      return;
152    }
153
154    for (Map.Entry<ServerName, List<RegionInfo>> entry : otherAssignments.entrySet()) {
155      ServerName sn = entry.getKey();
156      List<RegionInfo> regionsList = entry.getValue();
157      if (assignmentMap.get(sn) == null) {
158        assignmentMap.put(sn, Lists.newArrayList(regionsList));
159      } else {
160        assignmentMap.get(sn).addAll(regionsList);
161      }
162    }
163  }
164
165  private Map<ServerName, List<RegionInfo>> generateFNForRegionsWithoutFN(
166    FavoredNodeAssignmentHelper helper, List<RegionInfo> regions) throws IOException {
167
168    Map<ServerName, List<RegionInfo>> assignmentMap = Maps.newHashMap();
169    Map<RegionInfo, List<ServerName>> regionsNoFNMap;
170
171    if (regions.size() > 0) {
172      regionsNoFNMap = helper.generateFavoredNodesRoundRobin(assignmentMap, regions);
173      fnm.updateFavoredNodes(regionsNoFNMap);
174    }
175    return assignmentMap;
176  }
177
178  /**
179   * Return a pair - one with assignments when favored nodes are present and another with regions
180   * without favored nodes.
181   */
182  private Pair<Map<ServerName, List<RegionInfo>>, List<RegionInfo>>
183    segregateRegionsAndAssignRegionsWithFavoredNodes(Collection<RegionInfo> regions,
184      List<ServerName> onlineServers) throws HBaseIOException {
185
186    // Since we expect FN to be present most of the time, lets create map with same size
187    Map<ServerName, List<RegionInfo>> assignmentMapForFavoredNodes =
188      new HashMap<>(onlineServers.size());
189    List<RegionInfo> regionsWithNoFavoredNodes = new ArrayList<>();
190
191    for (RegionInfo region : regions) {
192      List<ServerName> favoredNodes = fnm.getFavoredNodes(region);
193      ServerName primaryHost = null;
194      ServerName secondaryHost = null;
195      ServerName tertiaryHost = null;
196
197      if (favoredNodes != null && !favoredNodes.isEmpty()) {
198        for (ServerName s : favoredNodes) {
199          ServerName serverWithLegitStartCode = getServerFromFavoredNode(onlineServers, s);
200          if (serverWithLegitStartCode != null) {
201            FavoredNodesPlan.Position position =
202              FavoredNodesPlan.getFavoredServerPosition(favoredNodes, s);
203            if (Position.PRIMARY.equals(position)) {
204              primaryHost = serverWithLegitStartCode;
205            } else if (Position.SECONDARY.equals(position)) {
206              secondaryHost = serverWithLegitStartCode;
207            } else if (Position.TERTIARY.equals(position)) {
208              tertiaryHost = serverWithLegitStartCode;
209            }
210          }
211        }
212        assignRegionToAvailableFavoredNode(assignmentMapForFavoredNodes, region, primaryHost,
213          secondaryHost, tertiaryHost);
214      } else {
215        regionsWithNoFavoredNodes.add(region);
216      }
217    }
218    return new Pair<>(assignmentMapForFavoredNodes, regionsWithNoFavoredNodes);
219  }
220
221  private void addRegionToMap(Map<ServerName, List<RegionInfo>> assignmentMapForFavoredNodes,
222    RegionInfo region, ServerName host) {
223    List<RegionInfo> regionsOnServer = assignmentMapForFavoredNodes.get(host);
224    if (regionsOnServer == null) {
225      regionsOnServer = Lists.newArrayList();
226      assignmentMapForFavoredNodes.put(host, regionsOnServer);
227    }
228    regionsOnServer.add(region);
229  }
230
231  /**
232   * Get the ServerName for the FavoredNode. Since FN's startcode is -1, we could want to get the
233   * ServerName with the correct start code from the list of provided servers.
234   */
235  private ServerName getServerFromFavoredNode(List<ServerName> servers, ServerName fn) {
236    for (ServerName server : servers) {
237      if (ServerName.isSameAddress(fn, server)) {
238        return server;
239      }
240    }
241    return null;
242  }
243
244  /**
245   * Assign the region to primary if its available. If both secondary and tertiary are available,
246   * assign to the host which has less load. Else assign to secondary or tertiary whichever is
247   * available (in that order).
248   */
249  private void assignRegionToAvailableFavoredNode(
250    Map<ServerName, List<RegionInfo>> assignmentMapForFavoredNodes, RegionInfo region,
251    ServerName primaryHost, ServerName secondaryHost, ServerName tertiaryHost) {
252    if (primaryHost != null) {
253      addRegionToMap(assignmentMapForFavoredNodes, region, primaryHost);
254    } else if (secondaryHost != null && tertiaryHost != null) {
255      // Assign the region to the one with a lower load (both have the desired hdfs blocks)
256      ServerName s;
257      ServerMetrics tertiaryLoad = provider.getLoad(tertiaryHost);
258      ServerMetrics secondaryLoad = provider.getLoad(secondaryHost);
259      if (secondaryLoad != null && tertiaryLoad != null) {
260        if (secondaryLoad.getRegionMetrics().size() < tertiaryLoad.getRegionMetrics().size()) {
261          s = secondaryHost;
262        } else {
263          s = tertiaryHost;
264        }
265      } else {
266        // We don't have one/more load, lets just choose a random node
267        s = ThreadLocalRandom.current().nextBoolean() ? secondaryHost : tertiaryHost;
268      }
269      addRegionToMap(assignmentMapForFavoredNodes, region, s);
270    } else if (secondaryHost != null) {
271      addRegionToMap(assignmentMapForFavoredNodes, region, secondaryHost);
272    } else if (tertiaryHost != null) {
273      addRegionToMap(assignmentMapForFavoredNodes, region, tertiaryHost);
274    } else {
275      // No favored nodes are online, lets assign to BOGUS server
276      addRegionToMap(assignmentMapForFavoredNodes, region, BOGUS_SERVER_NAME);
277    }
278  }
279
280  /**
281   * If we have favored nodes for a region, we will return one of the FN as destination. If favored
282   * nodes are not present for a region, we will generate and return one of the FN as destination.
283   * If we can't generate anything, lets fallback.
284   */
285  @Override
286  public ServerName randomAssignment(RegionInfo regionInfo, List<ServerName> servers)
287    throws HBaseIOException {
288    ServerName destination = null;
289    if (!FavoredNodesManager.isFavoredNodeApplicable(regionInfo)) {
290      return super.randomAssignment(regionInfo, servers);
291    }
292
293    metricsBalancer.incrMiscInvocations();
294
295    Configuration conf = getConf();
296    List<ServerName> favoredNodes = fnm.getFavoredNodes(regionInfo);
297    if (favoredNodes == null || favoredNodes.isEmpty()) {
298      // Generate new favored nodes and return primary
299      FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, conf);
300      helper.initialize();
301      try {
302        favoredNodes = helper.generateFavoredNodes(regionInfo);
303        updateFavoredNodesForRegion(regionInfo, favoredNodes);
304
305      } catch (IOException e) {
306        LOG.warn("Encountered exception while doing favored-nodes (random)assignment " + e);
307        throw new HBaseIOException(e);
308      }
309    }
310
311    List<ServerName> onlineServers = getOnlineFavoredNodes(servers, favoredNodes);
312    if (onlineServers.size() > 0) {
313      destination = onlineServers.get(ThreadLocalRandom.current().nextInt(onlineServers.size()));
314    }
315
316    boolean alwaysAssign = conf.getBoolean(FAVORED_ALWAYS_ASSIGN_REGIONS, true);
317    if (destination == null && alwaysAssign) {
318      LOG.warn("Can't generate FN for region: " + regionInfo + " falling back");
319      destination = super.randomAssignment(regionInfo, servers);
320    }
321    return destination;
322  }
323
324  private void updateFavoredNodesForRegion(RegionInfo regionInfo, List<ServerName> newFavoredNodes)
325    throws IOException {
326    Map<RegionInfo, List<ServerName>> regionFNMap = Maps.newHashMap();
327    regionFNMap.put(regionInfo, newFavoredNodes);
328    fnm.updateFavoredNodes(regionFNMap);
329  }
330
331  /**
332   * Reuse BaseLoadBalancer's retainAssignment, but generate favored nodes when its missing.
333   */
334  @Override
335  @NonNull
336  public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions,
337    List<ServerName> servers) throws HBaseIOException {
338    Map<ServerName, List<RegionInfo>> assignmentMap = Maps.newHashMap();
339    Map<ServerName, List<RegionInfo>> result = super.retainAssignment(regions, servers);
340    if (result.isEmpty()) {
341      LOG.warn("Nothing to assign to, probably no servers or no regions");
342      return result;
343    }
344
345    // Lets check if favored nodes info is in META, if not generate now.
346    FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, getConf());
347    helper.initialize();
348
349    LOG.debug("Generating favored nodes for regions missing them.");
350    Map<RegionInfo, List<ServerName>> regionFNMap = Maps.newHashMap();
351
352    try {
353      for (Map.Entry<ServerName, List<RegionInfo>> entry : result.entrySet()) {
354        ServerName sn = entry.getKey();
355        ServerName primary = ServerName.valueOf(sn.getHostname(), sn.getPort(), NON_STARTCODE);
356
357        for (RegionInfo hri : entry.getValue()) {
358
359          if (FavoredNodesManager.isFavoredNodeApplicable(hri)) {
360            List<ServerName> favoredNodes = fnm.getFavoredNodes(hri);
361            if (favoredNodes == null || favoredNodes.size() < FAVORED_NODES_NUM) {
362
363              LOG.debug("Generating favored nodes for: " + hri + " with primary: " + primary);
364              ServerName[] secondaryAndTertiaryNodes = helper.getSecondaryAndTertiary(hri, primary);
365              if (secondaryAndTertiaryNodes != null && secondaryAndTertiaryNodes.length == 2) {
366                List<ServerName> newFavoredNodes = Lists.newArrayList();
367                newFavoredNodes.add(primary);
368                newFavoredNodes.add(ServerName.valueOf(secondaryAndTertiaryNodes[0].getHostname(),
369                  secondaryAndTertiaryNodes[0].getPort(), NON_STARTCODE));
370                newFavoredNodes.add(ServerName.valueOf(secondaryAndTertiaryNodes[1].getHostname(),
371                  secondaryAndTertiaryNodes[1].getPort(), NON_STARTCODE));
372                regionFNMap.put(hri, newFavoredNodes);
373                addRegionToMap(assignmentMap, hri, sn);
374
375              } else {
376                throw new HBaseIOException(
377                  "Cannot generate secondary/tertiary FN for " + hri + " generated "
378                    + (secondaryAndTertiaryNodes != null ? secondaryAndTertiaryNodes : " nothing"));
379              }
380            } else {
381              List<ServerName> onlineFN = getOnlineFavoredNodes(servers, favoredNodes);
382              if (onlineFN.isEmpty()) {
383                // All favored nodes are dead, lets assign it to BOGUS
384                addRegionToMap(assignmentMap, hri, BOGUS_SERVER_NAME);
385              } else {
386                // Is primary not on FN? Less likely, but we can still take care of this.
387                if (FavoredNodesPlan.getFavoredServerPosition(favoredNodes, sn) != null) {
388                  addRegionToMap(assignmentMap, hri, sn);
389                } else {
390                  ServerName destination =
391                    onlineFN.get(ThreadLocalRandom.current().nextInt(onlineFN.size()));
392                  LOG.warn("Region: " + hri + " not hosted on favored nodes: " + favoredNodes
393                    + " current: " + sn + " moving to: " + destination);
394                  addRegionToMap(assignmentMap, hri, destination);
395                }
396              }
397            }
398          } else {
399            addRegionToMap(assignmentMap, hri, sn);
400          }
401        }
402      }
403
404      if (!regionFNMap.isEmpty()) {
405        LOG.debug("Updating FN in meta for missing regions, count: " + regionFNMap.size());
406        fnm.updateFavoredNodes(regionFNMap);
407      }
408
409    } catch (IOException e) {
410      throw new HBaseIOException("Cannot generate/update FN for regions: " + regionFNMap.keySet());
411    }
412
413    return assignmentMap;
414  }
415
416  /**
417   * Return list of favored nodes that are online.
418   */
419  private List<ServerName> getOnlineFavoredNodes(List<ServerName> onlineServers,
420    List<ServerName> serversWithoutStartCodes) {
421    if (serversWithoutStartCodes == null) {
422      return null;
423    } else {
424      List<ServerName> result = Lists.newArrayList();
425      for (ServerName sn : serversWithoutStartCodes) {
426        for (ServerName online : onlineServers) {
427          if (ServerName.isSameAddress(sn, online)) {
428            result.add(online);
429          }
430        }
431      }
432      return result;
433    }
434  }
435
436  @Override
437  public List<ServerName> getFavoredNodes(RegionInfo regionInfo) {
438    return this.fnm.getFavoredNodes(regionInfo);
439  }
440
441  /**
442   * Generate Favored Nodes for daughters during region split.
443   * <p/>
444   * If the parent does not have FN, regenerates them for the daughters.
445   * <p/>
446   * If the parent has FN, inherit two FN from parent for each daughter and generate the remaining.
447   * The primary FN for both the daughters should be the same as parent. Inherit the secondary FN
448   * from the parent but keep it different for each daughter. Choose the remaining FN randomly. This
449   * would give us better distribution over a period of time after enough splits.
450   */
451  @Override
452  public void generateFavoredNodesForDaughter(List<ServerName> servers, RegionInfo parent,
453    RegionInfo regionA, RegionInfo regionB) throws IOException {
454    Map<RegionInfo, List<ServerName>> result = new HashMap<>();
455    FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, rackManager);
456    helper.initialize();
457
458    List<ServerName> parentFavoredNodes = fnm.getFavoredNodes(parent);
459    if (parentFavoredNodes == null) {
460      LOG.debug("Unable to find favored nodes for parent, " + parent
461        + " generating new favored nodes for daughter");
462      result.put(regionA, helper.generateFavoredNodes(regionA));
463      result.put(regionB, helper.generateFavoredNodes(regionB));
464
465    } else {
466
467      // Lets get the primary and secondary from parent for regionA
468      Set<ServerName> regionAFN =
469        getInheritedFNForDaughter(helper, parentFavoredNodes, PRIMARY, SECONDARY);
470      result.put(regionA, Lists.newArrayList(regionAFN));
471
472      // Lets get the primary and tertiary from parent for regionB
473      Set<ServerName> regionBFN =
474        getInheritedFNForDaughter(helper, parentFavoredNodes, PRIMARY, TERTIARY);
475      result.put(regionB, Lists.newArrayList(regionBFN));
476    }
477
478    fnm.updateFavoredNodes(result);
479  }
480
481  private Set<ServerName> getInheritedFNForDaughter(FavoredNodeAssignmentHelper helper,
482    List<ServerName> parentFavoredNodes, Position primary, Position secondary) throws IOException {
483
484    Set<ServerName> daughterFN = Sets.newLinkedHashSet();
485    if (parentFavoredNodes.size() >= primary.ordinal()) {
486      daughterFN.add(parentFavoredNodes.get(primary.ordinal()));
487    }
488
489    if (parentFavoredNodes.size() >= secondary.ordinal()) {
490      daughterFN.add(parentFavoredNodes.get(secondary.ordinal()));
491    }
492
493    while (daughterFN.size() < FAVORED_NODES_NUM) {
494      ServerName newNode = helper.generateMissingFavoredNode(Lists.newArrayList(daughterFN));
495      daughterFN.add(newNode);
496    }
497    return daughterFN;
498  }
499
500  /**
501   * Generate favored nodes for a region during merge. Choose the FN from one of the sources to keep
502   * it simple.
503   */
504  @Override
505  public void generateFavoredNodesForMergedRegion(RegionInfo merged, RegionInfo[] mergeParents)
506    throws IOException {
507    updateFavoredNodesForRegion(merged, fnm.getFavoredNodes(mergeParents[0]));
508  }
509
510  /**
511   * Pick favored nodes with the highest locality for a region with lowest locality.
512   */
513  private class FavoredNodeLocalityPicker extends CandidateGenerator {
514
515    @Override
516    protected BalanceAction generate(BalancerClusterState cluster) {
517
518      int thisServer = pickRandomServer(cluster);
519      int thisRegion;
520      if (thisServer == -1) {
521        LOG.trace("Could not pick lowest local region server");
522        return BalanceAction.NULL_ACTION;
523      } else {
524        // Pick lowest local region on this server
525        thisRegion = pickLowestLocalRegionOnServer(cluster, thisServer);
526      }
527      if (thisRegion == -1) {
528        if (cluster.regionsPerServer[thisServer].length > 0) {
529          LOG.trace("Could not pick lowest local region even when region server held "
530            + cluster.regionsPerServer[thisServer].length + " regions");
531        }
532        return BalanceAction.NULL_ACTION;
533      }
534
535      RegionInfo hri = cluster.regions[thisRegion];
536      List<ServerName> favoredNodes = fnm.getFavoredNodes(hri);
537      int otherServer;
538      if (favoredNodes == null) {
539        if (!FavoredNodesManager.isFavoredNodeApplicable(hri)) {
540          otherServer = pickOtherRandomServer(cluster, thisServer);
541        } else {
542          // No FN, ignore
543          LOG.trace("Ignoring, no favored nodes for region: " + hri);
544          return BalanceAction.NULL_ACTION;
545        }
546      } else {
547        // Pick other favored node with the highest locality
548        otherServer = getDifferentFavoredNode(cluster, favoredNodes, thisServer);
549      }
550      return getAction(thisServer, thisRegion, otherServer, -1);
551    }
552
553    private int getDifferentFavoredNode(BalancerClusterState cluster, List<ServerName> favoredNodes,
554      int currentServer) {
555      List<Integer> fnIndex = new ArrayList<>();
556      for (ServerName sn : favoredNodes) {
557        if (cluster.serversToIndex.containsKey(sn.getAddress())) {
558          fnIndex.add(cluster.serversToIndex.get(sn.getAddress()));
559        }
560      }
561      float locality = 0;
562      int highestLocalRSIndex = -1;
563      for (Integer index : fnIndex) {
564        if (index != currentServer) {
565          float temp = cluster.localityPerServer[index];
566          if (temp >= locality) {
567            locality = temp;
568            highestLocalRSIndex = index;
569          }
570        }
571      }
572      return highestLocalRSIndex;
573    }
574
575    private int pickLowestLocalRegionOnServer(BalancerClusterState cluster, int server) {
576      return cluster.getLowestLocalityRegionOnServer(server);
577    }
578  }
579
580  /*
581   * This is like LoadCandidateGenerator, but we choose appropriate FN for the region on the most
582   * loaded server.
583   */
584  class FavoredNodeLoadPicker extends CandidateGenerator {
585
586    @Override
587    BalanceAction generate(BalancerClusterState cluster) {
588      cluster.sortServersByRegionCount();
589      int thisServer = pickMostLoadedServer(cluster);
590      int thisRegion = pickRandomRegion(cluster, thisServer, 0);
591      RegionInfo hri = cluster.regions[thisRegion];
592      int otherServer;
593      List<ServerName> favoredNodes = fnm.getFavoredNodes(hri);
594      if (favoredNodes == null) {
595        if (!FavoredNodesManager.isFavoredNodeApplicable(hri)) {
596          otherServer = pickLeastLoadedServer(cluster, thisServer);
597        } else {
598          return BalanceAction.NULL_ACTION;
599        }
600      } else {
601        otherServer = pickLeastLoadedFNServer(cluster, favoredNodes, thisServer);
602      }
603      return getAction(thisServer, thisRegion, otherServer, -1);
604    }
605
606    private int pickLeastLoadedServer(final BalancerClusterState cluster, int thisServer) {
607      Integer[] servers = cluster.serverIndicesSortedByRegionCount;
608      int index;
609      for (index = 0; index < servers.length; index++) {
610        if ((servers[index] != null) && servers[index] != thisServer) {
611          break;
612        }
613      }
614      return servers[index];
615    }
616
617    private int pickLeastLoadedFNServer(final BalancerClusterState cluster,
618      List<ServerName> favoredNodes, int currentServerIndex) {
619      List<Integer> fnIndex = new ArrayList<>();
620      for (ServerName sn : favoredNodes) {
621        if (cluster.serversToIndex.containsKey(sn.getAddress())) {
622          fnIndex.add(cluster.serversToIndex.get(sn.getAddress()));
623        }
624      }
625      int leastLoadedFN = -1;
626      int load = Integer.MAX_VALUE;
627      for (Integer index : fnIndex) {
628        if (index != currentServerIndex) {
629          int temp = cluster.getNumRegions(index);
630          if (temp < load) {
631            load = temp;
632            leastLoadedFN = index;
633          }
634        }
635      }
636      return leastLoadedFN;
637    }
638
639    private int pickMostLoadedServer(final BalancerClusterState cluster) {
640      Integer[] servers = cluster.serverIndicesSortedByRegionCount;
641      int index;
642      for (index = servers.length - 1; index > 0; index--) {
643        if (servers[index] != null) {
644          break;
645        }
646      }
647      return servers[index];
648    }
649  }
650
651  /**
652   * For all regions correctly assigned to favored nodes, we just use the stochastic balancer
653   * implementation. For the misplaced regions, we assign a bogus server to it and AM takes care.
654   */
655  @Override
656  protected List<RegionPlan> balanceTable(TableName tableName,
657    Map<ServerName, List<RegionInfo>> loadOfOneTable) {
658    List<RegionPlan> regionPlans = Lists.newArrayList();
659    Map<ServerName, List<RegionInfo>> correctAssignments = new HashMap<>();
660    int misplacedRegions = 0;
661
662    for (Map.Entry<ServerName, List<RegionInfo>> entry : loadOfOneTable.entrySet()) {
663      ServerName current = entry.getKey();
664      List<RegionInfo> regions = Lists.newArrayList();
665      correctAssignments.put(current, regions);
666
667      for (RegionInfo hri : entry.getValue()) {
668        List<ServerName> favoredNodes = fnm.getFavoredNodes(hri);
669        if (
670          FavoredNodesPlan.getFavoredServerPosition(favoredNodes, current) != null
671            || !FavoredNodesManager.isFavoredNodeApplicable(hri)
672        ) {
673          regions.add(hri);
674        } else {
675          // No favored nodes, lets unassign.
676          LOG.warn("Region not on favored nodes, unassign. Region: " + hri + " current: " + current
677            + " favored nodes: " + favoredNodes);
678          try {
679            provider.unassign(hri);
680          } catch (IOException e) {
681            LOG.warn("Failed unassign", e);
682            continue;
683          }
684          RegionPlan rp = new RegionPlan(hri, null, null);
685          regionPlans.add(rp);
686          misplacedRegions++;
687        }
688      }
689    }
690    LOG.debug("Found misplaced regions: " + misplacedRegions + ", not on favored nodes.");
691    List<RegionPlan> regionPlansFromBalance = super.balanceTable(tableName, correctAssignments);
692    if (regionPlansFromBalance != null) {
693      regionPlans.addAll(regionPlansFromBalance);
694    }
695    return regionPlans;
696  }
697}