001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import static org.apache.hadoop.hbase.ServerName.NON_STARTCODE;
021import static org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper.FAVORED_NODES_NUM;
022import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.PRIMARY;
023import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.SECONDARY;
024import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.TERTIARY;
025
026import edu.umd.cs.findbugs.annotations.NonNull;
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.Collection;
030import java.util.HashMap;
031import java.util.HashSet;
032import java.util.List;
033import java.util.Map;
034import java.util.Set;
035import java.util.concurrent.ThreadLocalRandom;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.HBaseIOException;
038import org.apache.hadoop.hbase.ServerMetrics;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.RegionInfo;
042import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper;
043import org.apache.hadoop.hbase.favored.FavoredNodesManager;
044import org.apache.hadoop.hbase.favored.FavoredNodesPlan;
045import org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position;
046import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
047import org.apache.hadoop.hbase.master.RegionPlan;
048import org.apache.hadoop.hbase.util.Pair;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
054import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
055import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
056
057/**
058 * An implementation of the {@link org.apache.hadoop.hbase.master.LoadBalancer} that assigns favored
059 * nodes for each region. There is a Primary RegionServer that hosts the region, and then there is
060 * Secondary and Tertiary RegionServers. Currently, the favored nodes information is used in
061 * creating HDFS files - the Primary RegionServer passes the primary, secondary, tertiary node
062 * addresses as hints to the DistributedFileSystem API for creating files on the filesystem. These
063 * nodes are treated as hints by the HDFS to place the blocks of the file. This alleviates the
064 * problem to do with reading from remote nodes (since we can make the Secondary RegionServer as the
065 * new Primary RegionServer) after a region is recovered. This should help provide consistent read
066 * latencies for the regions even when their primary region servers die. This provides two
067 * {@link CandidateGenerator}
068 */
069@InterfaceAudience.Private
070public class FavoredStochasticBalancer extends StochasticLoadBalancer
071  implements FavoredNodesPromoter {
072
073  private static final Logger LOG = LoggerFactory.getLogger(FavoredStochasticBalancer.class);
074
075  private FavoredNodesManager fnm;
076
077  @Override
078  public void setFavoredNodesManager(FavoredNodesManager fnm) {
079    this.fnm = fnm;
080  }
081
082  @Override
083  protected Map<Class<? extends CandidateGenerator>, CandidateGenerator>
084    createCandidateGenerators() {
085    Map<Class<? extends CandidateGenerator>, CandidateGenerator> fnPickers = new HashMap<>(2);
086    fnPickers.put(FavoredNodeLoadPicker.class, new FavoredNodeLoadPicker());
087    fnPickers.put(FavoredNodeLocalityPicker.class, new FavoredNodeLocalityPicker());
088    return fnPickers;
089  }
090
091  /** Returns any candidate generator in random */
092  @Override
093  protected CandidateGenerator getRandomGenerator() {
094    Class<? extends CandidateGenerator> clazz = shuffledGeneratorClasses.get()
095      .get(ThreadLocalRandom.current().nextInt(candidateGenerators.size()));
096    return candidateGenerators.get(clazz);
097  }
098
099  /**
100   * Round robin assignment: Segregate the regions into two types: 1. The regions that have favored
101   * node assignment where at least one of the favored node is still alive. In this case, try to
102   * adhere to the current favored nodes assignment as much as possible - i.e., if the current
103   * primary is gone, then make the secondary or tertiary as the new host for the region (based on
104   * their current load). Note that we don't change the favored node assignments here (even though
105   * one or more favored node is currently down). That will be done by the admin operations. 2. The
106   * regions that currently don't have favored node assignments. Generate favored nodes for them and
107   * then assign. Generate the primary fn in round robin fashion and generate secondary and tertiary
108   * as per favored nodes constraints.
109   */
110  @Override
111  @NonNull
112  public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions,
113    List<ServerName> servers) throws HBaseIOException {
114    metricsBalancer.incrMiscInvocations();
115    Map<ServerName, List<RegionInfo>> assignmentMap = new HashMap<>();
116    if (regions.isEmpty()) {
117      return assignmentMap;
118    }
119    Set<RegionInfo> regionSet = new HashSet<>(regions);
120    try {
121      FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, rackManager);
122      helper.initialize();
123
124      Set<RegionInfo> systemRegions = FavoredNodesManager.filterNonFNApplicableRegions(regionSet);
125      regionSet.removeAll(systemRegions);
126
127      // Assign all system regions
128      Map<ServerName, List<RegionInfo>> systemAssignments =
129        super.roundRobinAssignment(Lists.newArrayList(systemRegions), servers);
130
131      // Segregate favored and non-favored nodes regions and assign accordingly.
132      Pair<Map<ServerName, List<RegionInfo>>, List<RegionInfo>> segregatedRegions =
133        segregateRegionsAndAssignRegionsWithFavoredNodes(regionSet, servers);
134      Map<ServerName, List<RegionInfo>> regionsWithFavoredNodesMap = segregatedRegions.getFirst();
135      Map<ServerName, List<RegionInfo>> regionsWithoutFN =
136        generateFNForRegionsWithoutFN(helper, segregatedRegions.getSecond());
137
138      // merge the assignment maps
139      mergeAssignmentMaps(assignmentMap, systemAssignments);
140      mergeAssignmentMaps(assignmentMap, regionsWithFavoredNodesMap);
141      mergeAssignmentMaps(assignmentMap, regionsWithoutFN);
142
143    } catch (Exception ex) {
144      throw new HBaseIOException("Encountered exception while doing favored-nodes assignment " + ex
145        + " Falling back to regular assignment", ex);
146    }
147    return assignmentMap;
148  }
149
150  private void mergeAssignmentMaps(Map<ServerName, List<RegionInfo>> assignmentMap,
151    Map<ServerName, List<RegionInfo>> otherAssignments) {
152
153    if (otherAssignments == null || otherAssignments.isEmpty()) {
154      return;
155    }
156
157    for (Map.Entry<ServerName, List<RegionInfo>> entry : otherAssignments.entrySet()) {
158      ServerName sn = entry.getKey();
159      List<RegionInfo> regionsList = entry.getValue();
160      if (assignmentMap.get(sn) == null) {
161        assignmentMap.put(sn, Lists.newArrayList(regionsList));
162      } else {
163        assignmentMap.get(sn).addAll(regionsList);
164      }
165    }
166  }
167
168  private Map<ServerName, List<RegionInfo>> generateFNForRegionsWithoutFN(
169    FavoredNodeAssignmentHelper helper, List<RegionInfo> regions) throws IOException {
170
171    Map<ServerName, List<RegionInfo>> assignmentMap = Maps.newHashMap();
172    Map<RegionInfo, List<ServerName>> regionsNoFNMap;
173
174    if (regions.size() > 0) {
175      regionsNoFNMap = helper.generateFavoredNodesRoundRobin(assignmentMap, regions);
176      fnm.updateFavoredNodes(regionsNoFNMap);
177    }
178    return assignmentMap;
179  }
180
181  /**
182   * Return a pair - one with assignments when favored nodes are present and another with regions
183   * without favored nodes.
184   */
185  private Pair<Map<ServerName, List<RegionInfo>>, List<RegionInfo>>
186    segregateRegionsAndAssignRegionsWithFavoredNodes(Collection<RegionInfo> regions,
187      List<ServerName> onlineServers) throws HBaseIOException {
188
189    // Since we expect FN to be present most of the time, lets create map with same size
190    Map<ServerName, List<RegionInfo>> assignmentMapForFavoredNodes =
191      new HashMap<>(onlineServers.size());
192    List<RegionInfo> regionsWithNoFavoredNodes = new ArrayList<>();
193
194    for (RegionInfo region : regions) {
195      List<ServerName> favoredNodes = fnm.getFavoredNodes(region);
196      ServerName primaryHost = null;
197      ServerName secondaryHost = null;
198      ServerName tertiaryHost = null;
199
200      if (favoredNodes != null && !favoredNodes.isEmpty()) {
201        for (ServerName s : favoredNodes) {
202          ServerName serverWithLegitStartCode = getServerFromFavoredNode(onlineServers, s);
203          if (serverWithLegitStartCode != null) {
204            FavoredNodesPlan.Position position =
205              FavoredNodesPlan.getFavoredServerPosition(favoredNodes, s);
206            if (Position.PRIMARY.equals(position)) {
207              primaryHost = serverWithLegitStartCode;
208            } else if (Position.SECONDARY.equals(position)) {
209              secondaryHost = serverWithLegitStartCode;
210            } else if (Position.TERTIARY.equals(position)) {
211              tertiaryHost = serverWithLegitStartCode;
212            }
213          }
214        }
215        assignRegionToAvailableFavoredNode(assignmentMapForFavoredNodes, region, primaryHost,
216          secondaryHost, tertiaryHost);
217      } else {
218        regionsWithNoFavoredNodes.add(region);
219      }
220    }
221    return new Pair<>(assignmentMapForFavoredNodes, regionsWithNoFavoredNodes);
222  }
223
224  private void addRegionToMap(Map<ServerName, List<RegionInfo>> assignmentMapForFavoredNodes,
225    RegionInfo region, ServerName host) {
226    List<RegionInfo> regionsOnServer = assignmentMapForFavoredNodes.get(host);
227    if (regionsOnServer == null) {
228      regionsOnServer = Lists.newArrayList();
229      assignmentMapForFavoredNodes.put(host, regionsOnServer);
230    }
231    regionsOnServer.add(region);
232  }
233
234  /**
235   * Get the ServerName for the FavoredNode. Since FN's startcode is -1, we could want to get the
236   * ServerName with the correct start code from the list of provided servers.
237   */
238  private ServerName getServerFromFavoredNode(List<ServerName> servers, ServerName fn) {
239    for (ServerName server : servers) {
240      if (ServerName.isSameAddress(fn, server)) {
241        return server;
242      }
243    }
244    return null;
245  }
246
247  /**
248   * Assign the region to primary if its available. If both secondary and tertiary are available,
249   * assign to the host which has less load. Else assign to secondary or tertiary whichever is
250   * available (in that order).
251   */
252  private void assignRegionToAvailableFavoredNode(
253    Map<ServerName, List<RegionInfo>> assignmentMapForFavoredNodes, RegionInfo region,
254    ServerName primaryHost, ServerName secondaryHost, ServerName tertiaryHost) {
255    if (primaryHost != null) {
256      addRegionToMap(assignmentMapForFavoredNodes, region, primaryHost);
257    } else if (secondaryHost != null && tertiaryHost != null) {
258      // Assign the region to the one with a lower load (both have the desired hdfs blocks)
259      ServerName s;
260      ServerMetrics tertiaryLoad = provider.getLoad(tertiaryHost);
261      ServerMetrics secondaryLoad = provider.getLoad(secondaryHost);
262      if (secondaryLoad != null && tertiaryLoad != null) {
263        if (secondaryLoad.getRegionMetrics().size() < tertiaryLoad.getRegionMetrics().size()) {
264          s = secondaryHost;
265        } else {
266          s = tertiaryHost;
267        }
268      } else {
269        // We don't have one/more load, lets just choose a random node
270        s = ThreadLocalRandom.current().nextBoolean() ? secondaryHost : tertiaryHost;
271      }
272      addRegionToMap(assignmentMapForFavoredNodes, region, s);
273    } else if (secondaryHost != null) {
274      addRegionToMap(assignmentMapForFavoredNodes, region, secondaryHost);
275    } else if (tertiaryHost != null) {
276      addRegionToMap(assignmentMapForFavoredNodes, region, tertiaryHost);
277    } else {
278      // No favored nodes are online, lets assign to BOGUS server
279      addRegionToMap(assignmentMapForFavoredNodes, region, BOGUS_SERVER_NAME);
280    }
281  }
282
283  /**
284   * If we have favored nodes for a region, we will return one of the FN as destination. If favored
285   * nodes are not present for a region, we will generate and return one of the FN as destination.
286   * If we can't generate anything, lets fallback.
287   */
288  @Override
289  public ServerName randomAssignment(RegionInfo regionInfo, List<ServerName> servers)
290    throws HBaseIOException {
291    ServerName destination = null;
292    if (!FavoredNodesManager.isFavoredNodeApplicable(regionInfo)) {
293      return super.randomAssignment(regionInfo, servers);
294    }
295
296    metricsBalancer.incrMiscInvocations();
297
298    Configuration conf = getConf();
299    List<ServerName> favoredNodes = fnm.getFavoredNodes(regionInfo);
300    if (favoredNodes == null || favoredNodes.isEmpty()) {
301      // Generate new favored nodes and return primary
302      FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, conf);
303      helper.initialize();
304      try {
305        favoredNodes = helper.generateFavoredNodes(regionInfo);
306        updateFavoredNodesForRegion(regionInfo, favoredNodes);
307
308      } catch (IOException e) {
309        LOG.warn("Encountered exception while doing favored-nodes (random)assignment " + e);
310        throw new HBaseIOException(e);
311      }
312    }
313
314    List<ServerName> onlineServers = getOnlineFavoredNodes(servers, favoredNodes);
315    if (onlineServers.size() > 0) {
316      destination = onlineServers.get(ThreadLocalRandom.current().nextInt(onlineServers.size()));
317    }
318
319    boolean alwaysAssign = conf.getBoolean(FAVORED_ALWAYS_ASSIGN_REGIONS, true);
320    if (destination == null && alwaysAssign) {
321      LOG.warn("Can't generate FN for region: " + regionInfo + " falling back");
322      destination = super.randomAssignment(regionInfo, servers);
323    }
324    return destination;
325  }
326
327  private void updateFavoredNodesForRegion(RegionInfo regionInfo, List<ServerName> newFavoredNodes)
328    throws IOException {
329    Map<RegionInfo, List<ServerName>> regionFNMap = Maps.newHashMap();
330    regionFNMap.put(regionInfo, newFavoredNodes);
331    fnm.updateFavoredNodes(regionFNMap);
332  }
333
334  /**
335   * Reuse BaseLoadBalancer's retainAssignment, but generate favored nodes when its missing.
336   */
337  @Override
338  @NonNull
339  public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions,
340    List<ServerName> servers) throws HBaseIOException {
341    Map<ServerName, List<RegionInfo>> assignmentMap = Maps.newHashMap();
342    Map<ServerName, List<RegionInfo>> result = super.retainAssignment(regions, servers);
343    if (result.isEmpty()) {
344      LOG.warn("Nothing to assign to, probably no servers or no regions");
345      return result;
346    }
347
348    // Lets check if favored nodes info is in META, if not generate now.
349    FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, getConf());
350    helper.initialize();
351
352    LOG.debug("Generating favored nodes for regions missing them.");
353    Map<RegionInfo, List<ServerName>> regionFNMap = Maps.newHashMap();
354
355    try {
356      for (Map.Entry<ServerName, List<RegionInfo>> entry : result.entrySet()) {
357        ServerName sn = entry.getKey();
358        ServerName primary = ServerName.valueOf(sn.getHostname(), sn.getPort(), NON_STARTCODE);
359
360        for (RegionInfo hri : entry.getValue()) {
361
362          if (FavoredNodesManager.isFavoredNodeApplicable(hri)) {
363            List<ServerName> favoredNodes = fnm.getFavoredNodes(hri);
364            if (favoredNodes == null || favoredNodes.size() < FAVORED_NODES_NUM) {
365
366              LOG.debug("Generating favored nodes for: " + hri + " with primary: " + primary);
367              ServerName[] secondaryAndTertiaryNodes = helper.getSecondaryAndTertiary(hri, primary);
368              if (secondaryAndTertiaryNodes != null && secondaryAndTertiaryNodes.length == 2) {
369                List<ServerName> newFavoredNodes = Lists.newArrayList();
370                newFavoredNodes.add(primary);
371                newFavoredNodes.add(ServerName.valueOf(secondaryAndTertiaryNodes[0].getHostname(),
372                  secondaryAndTertiaryNodes[0].getPort(), NON_STARTCODE));
373                newFavoredNodes.add(ServerName.valueOf(secondaryAndTertiaryNodes[1].getHostname(),
374                  secondaryAndTertiaryNodes[1].getPort(), NON_STARTCODE));
375                regionFNMap.put(hri, newFavoredNodes);
376                addRegionToMap(assignmentMap, hri, sn);
377
378              } else {
379                throw new HBaseIOException(
380                  "Cannot generate secondary/tertiary FN for " + hri + " generated "
381                    + (secondaryAndTertiaryNodes != null ? secondaryAndTertiaryNodes : " nothing"));
382              }
383            } else {
384              List<ServerName> onlineFN = getOnlineFavoredNodes(servers, favoredNodes);
385              if (onlineFN.isEmpty()) {
386                // All favored nodes are dead, lets assign it to BOGUS
387                addRegionToMap(assignmentMap, hri, BOGUS_SERVER_NAME);
388              } else {
389                // Is primary not on FN? Less likely, but we can still take care of this.
390                if (FavoredNodesPlan.getFavoredServerPosition(favoredNodes, sn) != null) {
391                  addRegionToMap(assignmentMap, hri, sn);
392                } else {
393                  ServerName destination =
394                    onlineFN.get(ThreadLocalRandom.current().nextInt(onlineFN.size()));
395                  LOG.warn("Region: " + hri + " not hosted on favored nodes: " + favoredNodes
396                    + " current: " + sn + " moving to: " + destination);
397                  addRegionToMap(assignmentMap, hri, destination);
398                }
399              }
400            }
401          } else {
402            addRegionToMap(assignmentMap, hri, sn);
403          }
404        }
405      }
406
407      if (!regionFNMap.isEmpty()) {
408        LOG.debug("Updating FN in meta for missing regions, count: " + regionFNMap.size());
409        fnm.updateFavoredNodes(regionFNMap);
410      }
411
412    } catch (IOException e) {
413      throw new HBaseIOException("Cannot generate/update FN for regions: " + regionFNMap.keySet());
414    }
415
416    return assignmentMap;
417  }
418
419  /**
420   * Return list of favored nodes that are online.
421   */
422  private List<ServerName> getOnlineFavoredNodes(List<ServerName> onlineServers,
423    List<ServerName> serversWithoutStartCodes) {
424    if (serversWithoutStartCodes == null) {
425      return null;
426    } else {
427      List<ServerName> result = Lists.newArrayList();
428      for (ServerName sn : serversWithoutStartCodes) {
429        for (ServerName online : onlineServers) {
430          if (ServerName.isSameAddress(sn, online)) {
431            result.add(online);
432          }
433        }
434      }
435      return result;
436    }
437  }
438
439  @Override
440  public List<ServerName> getFavoredNodes(RegionInfo regionInfo) {
441    return this.fnm.getFavoredNodes(regionInfo);
442  }
443
444  /**
445   * Generate Favored Nodes for daughters during region split.
446   * <p/>
447   * If the parent does not have FN, regenerates them for the daughters.
448   * <p/>
449   * If the parent has FN, inherit two FN from parent for each daughter and generate the remaining.
450   * The primary FN for both the daughters should be the same as parent. Inherit the secondary FN
451   * from the parent but keep it different for each daughter. Choose the remaining FN randomly. This
452   * would give us better distribution over a period of time after enough splits.
453   */
454  @Override
455  public void generateFavoredNodesForDaughter(List<ServerName> servers, RegionInfo parent,
456    RegionInfo regionA, RegionInfo regionB) throws IOException {
457    Map<RegionInfo, List<ServerName>> result = new HashMap<>();
458    FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, rackManager);
459    helper.initialize();
460
461    List<ServerName> parentFavoredNodes = fnm.getFavoredNodes(parent);
462    if (parentFavoredNodes == null) {
463      LOG.debug("Unable to find favored nodes for parent, " + parent
464        + " generating new favored nodes for daughter");
465      result.put(regionA, helper.generateFavoredNodes(regionA));
466      result.put(regionB, helper.generateFavoredNodes(regionB));
467
468    } else {
469
470      // Lets get the primary and secondary from parent for regionA
471      Set<ServerName> regionAFN =
472        getInheritedFNForDaughter(helper, parentFavoredNodes, PRIMARY, SECONDARY);
473      result.put(regionA, Lists.newArrayList(regionAFN));
474
475      // Lets get the primary and tertiary from parent for regionB
476      Set<ServerName> regionBFN =
477        getInheritedFNForDaughter(helper, parentFavoredNodes, PRIMARY, TERTIARY);
478      result.put(regionB, Lists.newArrayList(regionBFN));
479    }
480
481    fnm.updateFavoredNodes(result);
482  }
483
484  private Set<ServerName> getInheritedFNForDaughter(FavoredNodeAssignmentHelper helper,
485    List<ServerName> parentFavoredNodes, Position primary, Position secondary) throws IOException {
486
487    Set<ServerName> daughterFN = Sets.newLinkedHashSet();
488    if (parentFavoredNodes.size() >= primary.ordinal()) {
489      daughterFN.add(parentFavoredNodes.get(primary.ordinal()));
490    }
491
492    if (parentFavoredNodes.size() >= secondary.ordinal()) {
493      daughterFN.add(parentFavoredNodes.get(secondary.ordinal()));
494    }
495
496    while (daughterFN.size() < FAVORED_NODES_NUM) {
497      ServerName newNode = helper.generateMissingFavoredNode(Lists.newArrayList(daughterFN));
498      daughterFN.add(newNode);
499    }
500    return daughterFN;
501  }
502
503  /**
504   * Generate favored nodes for a region during merge. Choose the FN from one of the sources to keep
505   * it simple.
506   */
507  @Override
508  public void generateFavoredNodesForMergedRegion(RegionInfo merged, RegionInfo[] mergeParents)
509    throws IOException {
510    updateFavoredNodesForRegion(merged, fnm.getFavoredNodes(mergeParents[0]));
511  }
512
513  /**
514   * Pick favored nodes with the highest locality for a region with lowest locality.
515   */
516  private class FavoredNodeLocalityPicker extends CandidateGenerator {
517
518    @Override
519    protected BalanceAction generate(BalancerClusterState cluster) {
520
521      int thisServer = pickRandomServer(cluster);
522      int thisRegion;
523      if (thisServer == -1) {
524        LOG.trace("Could not pick lowest local region server");
525        return BalanceAction.NULL_ACTION;
526      } else {
527        // Pick lowest local region on this server
528        thisRegion = pickLowestLocalRegionOnServer(cluster, thisServer);
529      }
530      if (thisRegion == -1) {
531        if (cluster.regionsPerServer[thisServer].length > 0) {
532          LOG.trace("Could not pick lowest local region even when region server held "
533            + cluster.regionsPerServer[thisServer].length + " regions");
534        }
535        return BalanceAction.NULL_ACTION;
536      }
537
538      RegionInfo hri = cluster.regions[thisRegion];
539      List<ServerName> favoredNodes = fnm.getFavoredNodes(hri);
540      int otherServer;
541      if (favoredNodes == null) {
542        if (!FavoredNodesManager.isFavoredNodeApplicable(hri)) {
543          otherServer = pickOtherRandomServer(cluster, thisServer);
544        } else {
545          // No FN, ignore
546          LOG.trace("Ignoring, no favored nodes for region: " + hri);
547          return BalanceAction.NULL_ACTION;
548        }
549      } else {
550        // Pick other favored node with the highest locality
551        otherServer = getDifferentFavoredNode(cluster, favoredNodes, thisServer);
552      }
553      return getAction(thisServer, thisRegion, otherServer, -1);
554    }
555
556    private int getDifferentFavoredNode(BalancerClusterState cluster, List<ServerName> favoredNodes,
557      int currentServer) {
558      List<Integer> fnIndex = new ArrayList<>();
559      for (ServerName sn : favoredNodes) {
560        if (cluster.serversToIndex.containsKey(sn.getAddress())) {
561          fnIndex.add(cluster.serversToIndex.get(sn.getAddress()));
562        }
563      }
564      float locality = 0;
565      int highestLocalRSIndex = -1;
566      for (Integer index : fnIndex) {
567        if (index != currentServer) {
568          float temp = cluster.localityPerServer[index];
569          if (temp >= locality) {
570            locality = temp;
571            highestLocalRSIndex = index;
572          }
573        }
574      }
575      return highestLocalRSIndex;
576    }
577
578    private int pickLowestLocalRegionOnServer(BalancerClusterState cluster, int server) {
579      return cluster.getLowestLocalityRegionOnServer(server);
580    }
581  }
582
583  /*
584   * This is like LoadCandidateGenerator, but we choose appropriate FN for the region on the most
585   * loaded server.
586   */
587  class FavoredNodeLoadPicker extends CandidateGenerator {
588
589    @Override
590    BalanceAction generate(BalancerClusterState cluster) {
591      cluster.sortServersByRegionCount();
592      int thisServer = pickMostLoadedServer(cluster);
593      int thisRegion = pickRandomRegion(cluster, thisServer, 0);
594      RegionInfo hri = cluster.regions[thisRegion];
595      int otherServer;
596      List<ServerName> favoredNodes = fnm.getFavoredNodes(hri);
597      if (favoredNodes == null) {
598        if (!FavoredNodesManager.isFavoredNodeApplicable(hri)) {
599          otherServer = pickLeastLoadedServer(cluster, thisServer);
600        } else {
601          return BalanceAction.NULL_ACTION;
602        }
603      } else {
604        otherServer = pickLeastLoadedFNServer(cluster, favoredNodes, thisServer);
605      }
606      return getAction(thisServer, thisRegion, otherServer, -1);
607    }
608
609    private int pickLeastLoadedServer(final BalancerClusterState cluster, int thisServer) {
610      Integer[] servers = cluster.serverIndicesSortedByRegionCount;
611      int index;
612      for (index = 0; index < servers.length; index++) {
613        if ((servers[index] != null) && servers[index] != thisServer) {
614          break;
615        }
616      }
617      return servers[index];
618    }
619
620    private int pickLeastLoadedFNServer(final BalancerClusterState cluster,
621      List<ServerName> favoredNodes, int currentServerIndex) {
622      List<Integer> fnIndex = new ArrayList<>();
623      for (ServerName sn : favoredNodes) {
624        if (cluster.serversToIndex.containsKey(sn.getAddress())) {
625          fnIndex.add(cluster.serversToIndex.get(sn.getAddress()));
626        }
627      }
628      int leastLoadedFN = -1;
629      int load = Integer.MAX_VALUE;
630      for (Integer index : fnIndex) {
631        if (index != currentServerIndex) {
632          int temp = cluster.getNumRegions(index);
633          if (temp < load) {
634            load = temp;
635            leastLoadedFN = index;
636          }
637        }
638      }
639      return leastLoadedFN;
640    }
641
642    private int pickMostLoadedServer(final BalancerClusterState cluster) {
643      Integer[] servers = cluster.serverIndicesSortedByRegionCount;
644      int index;
645      for (index = servers.length - 1; index > 0; index--) {
646        if (servers[index] != null) {
647          break;
648        }
649      }
650      return servers[index];
651    }
652  }
653
654  /**
655   * For all regions correctly assigned to favored nodes, we just use the stochastic balancer
656   * implementation. For the misplaced regions, we assign a bogus server to it and AM takes care.
657   */
658  @Override
659  protected List<RegionPlan> balanceTable(TableName tableName,
660    Map<ServerName, List<RegionInfo>> loadOfOneTable) {
661    List<RegionPlan> regionPlans = Lists.newArrayList();
662    Map<ServerName, List<RegionInfo>> correctAssignments = new HashMap<>();
663    int misplacedRegions = 0;
664
665    for (Map.Entry<ServerName, List<RegionInfo>> entry : loadOfOneTable.entrySet()) {
666      ServerName current = entry.getKey();
667      List<RegionInfo> regions = Lists.newArrayList();
668      correctAssignments.put(current, regions);
669
670      for (RegionInfo hri : entry.getValue()) {
671        List<ServerName> favoredNodes = fnm.getFavoredNodes(hri);
672        if (
673          FavoredNodesPlan.getFavoredServerPosition(favoredNodes, current) != null
674            || !FavoredNodesManager.isFavoredNodeApplicable(hri)
675        ) {
676          regions.add(hri);
677        } else {
678          // No favored nodes, lets unassign.
679          LOG.warn("Region not on favored nodes, unassign. Region: " + hri + " current: " + current
680            + " favored nodes: " + favoredNodes);
681          try {
682            provider.unassign(hri);
683          } catch (IOException e) {
684            LOG.warn("Failed unassign", e);
685            continue;
686          }
687          RegionPlan rp = new RegionPlan(hri, null, null);
688          regionPlans.add(rp);
689          misplacedRegions++;
690        }
691      }
692    }
693    LOG.debug("Found misplaced regions: " + misplacedRegions + ", not on favored nodes.");
694    List<RegionPlan> regionPlansFromBalance = super.balanceTable(tableName, correctAssignments);
695    if (regionPlansFromBalance != null) {
696      regionPlans.addAll(regionPlansFromBalance);
697    }
698    return regionPlans;
699  }
700}