001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import static org.apache.hadoop.hbase.ServerName.NON_STARTCODE;
021import static org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper.FAVORED_NODES_NUM;
022import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.PRIMARY;
023import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.SECONDARY;
024import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.TERTIARY;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Collection;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Map;
032import java.util.Map.Entry;
033import java.util.Set;
034import org.apache.hadoop.hbase.HBaseIOException;
035import org.apache.hadoop.hbase.ServerMetrics;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.client.RegionInfo;
038import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper;
039import org.apache.hadoop.hbase.favored.FavoredNodesManager;
040import org.apache.hadoop.hbase.favored.FavoredNodesPlan;
041import org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position;
042import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
043import org.apache.hadoop.hbase.master.LoadBalancer;
044import org.apache.hadoop.hbase.master.MasterServices;
045import org.apache.hadoop.hbase.master.RegionPlan;
046import org.apache.hadoop.hbase.util.Pair;
047import org.apache.yetus.audience.InterfaceAudience;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
052import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
053import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
054
055/**
056 * An implementation of the {@link org.apache.hadoop.hbase.master.LoadBalancer} that
057 * assigns favored nodes for each region. There is a Primary RegionServer that hosts
058 * the region, and then there is Secondary and Tertiary RegionServers. Currently, the
059 * favored nodes information is used in creating HDFS files - the Primary RegionServer
060 * passes the primary, secondary, tertiary node addresses as hints to the
061 * DistributedFileSystem API for creating files on the filesystem. These nodes are
062 * treated as hints by the HDFS to place the blocks of the file. This alleviates the
063 * problem to do with reading from remote nodes (since we can make the Secondary
064 * RegionServer as the new Primary RegionServer) after a region is recovered. This
065 * should help provide consistent read latencies for the regions even when their
066 * primary region servers die. This provides two
067 * {@link org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.CandidateGenerator}
068 *
069 */
070@InterfaceAudience.Private
071public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
072    FavoredNodesPromoter {
073
074  private static final Logger LOG = LoggerFactory.getLogger(FavoredStochasticBalancer.class);
075  private FavoredNodesManager fnm;
076
077  @Override
078  public void initialize() throws HBaseIOException {
079    configureGenerators();
080    super.initialize();
081  }
082
083  protected void configureGenerators() {
084    List<CandidateGenerator> fnPickers = new ArrayList<>(2);
085    fnPickers.add(new FavoredNodeLoadPicker());
086    fnPickers.add(new FavoredNodeLocalityPicker());
087    setCandidateGenerators(fnPickers);
088  }
089
090  @Override
091  public synchronized void setMasterServices(MasterServices masterServices) {
092    super.setMasterServices(masterServices);
093    fnm = masterServices.getFavoredNodesManager();
094  }
095
096  /*
097   * Round robin assignment: Segregate the regions into two types:
098   *
099   * 1. The regions that have favored node assignment where at least one of the favored node
100   * is still alive. In this case, try to adhere to the current favored nodes assignment as
101   * much as possible - i.e., if the current primary is gone, then make the secondary or
102   * tertiary as the new host for the region (based on their current load). Note that we don't
103   * change the favored node assignments here (even though one or more favored node is
104   * currently down). That will be done by the admin operations.
105   *
106   * 2. The regions that currently don't have favored node assignments. Generate favored nodes
107   * for them and then assign. Generate the primary fn in round robin fashion and generate
108   * secondary and tertiary as per favored nodes constraints.
109   */
110  @Override
111  public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions,
112      List<ServerName> servers) throws HBaseIOException {
113
114    metricsBalancer.incrMiscInvocations();
115
116    Set<RegionInfo> regionSet = Sets.newHashSet(regions);
117    Map<ServerName, List<RegionInfo>> assignmentMap = assignMasterSystemRegions(regions, servers);
118    if (assignmentMap != null && !assignmentMap.isEmpty()) {
119      servers = new ArrayList<>(servers);
120      // Guarantee not to put other regions on master
121      servers.remove(masterServerName);
122      List<RegionInfo> masterRegions = assignmentMap.get(masterServerName);
123      if (!masterRegions.isEmpty()) {
124        for (RegionInfo region: masterRegions) {
125          regionSet.remove(region);
126        }
127      }
128    }
129
130    if (regionSet.isEmpty()) {
131      return assignmentMap;
132    }
133
134    try {
135      FavoredNodeAssignmentHelper helper =
136          new FavoredNodeAssignmentHelper(servers, fnm.getRackManager());
137      helper.initialize();
138
139      Set<RegionInfo> systemRegions = FavoredNodesManager.filterNonFNApplicableRegions(regionSet);
140      regionSet.removeAll(systemRegions);
141
142      // Assign all system regions
143      Map<ServerName, List<RegionInfo>> systemAssignments =
144        super.roundRobinAssignment(Lists.newArrayList(systemRegions), servers);
145
146      // Segregate favored and non-favored nodes regions and assign accordingly.
147      Pair<Map<ServerName,List<RegionInfo>>, List<RegionInfo>> segregatedRegions =
148        segregateRegionsAndAssignRegionsWithFavoredNodes(regionSet, servers);
149      Map<ServerName, List<RegionInfo>> regionsWithFavoredNodesMap = segregatedRegions.getFirst();
150      Map<ServerName, List<RegionInfo>> regionsWithoutFN =
151        generateFNForRegionsWithoutFN(helper, segregatedRegions.getSecond());
152
153      // merge the assignment maps
154      mergeAssignmentMaps(assignmentMap, systemAssignments);
155      mergeAssignmentMaps(assignmentMap, regionsWithFavoredNodesMap);
156      mergeAssignmentMaps(assignmentMap, regionsWithoutFN);
157
158    } catch (Exception ex) {
159      throw new HBaseIOException("Encountered exception while doing favored-nodes assignment "
160        + ex + " Falling back to regular assignment", ex);
161    }
162    return assignmentMap;
163  }
164
165  private void mergeAssignmentMaps(Map<ServerName, List<RegionInfo>> assignmentMap,
166      Map<ServerName, List<RegionInfo>> otherAssignments) {
167
168    if (otherAssignments == null || otherAssignments.isEmpty()) {
169      return;
170    }
171
172    for (Entry<ServerName, List<RegionInfo>> entry : otherAssignments.entrySet()) {
173      ServerName sn = entry.getKey();
174      List<RegionInfo> regionsList = entry.getValue();
175      if (assignmentMap.get(sn) == null) {
176        assignmentMap.put(sn, Lists.newArrayList(regionsList));
177      } else {
178        assignmentMap.get(sn).addAll(regionsList);
179      }
180    }
181  }
182
183  private Map<ServerName, List<RegionInfo>> generateFNForRegionsWithoutFN(
184      FavoredNodeAssignmentHelper helper, List<RegionInfo> regions) throws IOException {
185
186    Map<ServerName, List<RegionInfo>> assignmentMap = Maps.newHashMap();
187    Map<RegionInfo, List<ServerName>> regionsNoFNMap;
188
189    if (regions.size() > 0) {
190      regionsNoFNMap = helper.generateFavoredNodesRoundRobin(assignmentMap, regions);
191      fnm.updateFavoredNodes(regionsNoFNMap);
192    }
193    return assignmentMap;
194  }
195
196  /*
197   * Return a pair - one with assignments when favored nodes are present and another with regions
198   * without favored nodes.
199   */
200  private Pair<Map<ServerName, List<RegionInfo>>, List<RegionInfo>>
201  segregateRegionsAndAssignRegionsWithFavoredNodes(Collection<RegionInfo> regions,
202      List<ServerName> onlineServers) throws HBaseIOException {
203
204    // Since we expect FN to be present most of the time, lets create map with same size
205    Map<ServerName, List<RegionInfo>> assignmentMapForFavoredNodes =
206        new HashMap<>(onlineServers.size());
207    List<RegionInfo> regionsWithNoFavoredNodes = new ArrayList<>();
208
209    for (RegionInfo region : regions) {
210      List<ServerName> favoredNodes = fnm.getFavoredNodes(region);
211      ServerName primaryHost = null;
212      ServerName secondaryHost = null;
213      ServerName tertiaryHost = null;
214
215      if (favoredNodes != null && !favoredNodes.isEmpty()) {
216        for (ServerName s : favoredNodes) {
217          ServerName serverWithLegitStartCode = getServerFromFavoredNode(onlineServers, s);
218          if (serverWithLegitStartCode != null) {
219            FavoredNodesPlan.Position position =
220                FavoredNodesPlan.getFavoredServerPosition(favoredNodes, s);
221            if (Position.PRIMARY.equals(position)) {
222              primaryHost = serverWithLegitStartCode;
223            } else if (Position.SECONDARY.equals(position)) {
224              secondaryHost = serverWithLegitStartCode;
225            } else if (Position.TERTIARY.equals(position)) {
226              tertiaryHost = serverWithLegitStartCode;
227            }
228          }
229        }
230        assignRegionToAvailableFavoredNode(assignmentMapForFavoredNodes, region, primaryHost,
231            secondaryHost, tertiaryHost);
232      } else {
233        regionsWithNoFavoredNodes.add(region);
234      }
235    }
236    return new Pair<>(assignmentMapForFavoredNodes, regionsWithNoFavoredNodes);
237  }
238
239  private void addRegionToMap(Map<ServerName, List<RegionInfo>> assignmentMapForFavoredNodes,
240      RegionInfo region, ServerName host) {
241
242    List<RegionInfo> regionsOnServer;
243    if ((regionsOnServer = assignmentMapForFavoredNodes.get(host)) == null) {
244      regionsOnServer = Lists.newArrayList();
245      assignmentMapForFavoredNodes.put(host, regionsOnServer);
246    }
247    regionsOnServer.add(region);
248  }
249
250  /*
251   * Get the ServerName for the FavoredNode. Since FN's startcode is -1, we could want to get the
252   * ServerName with the correct start code from the list of provided servers.
253   */
254  private ServerName getServerFromFavoredNode(List<ServerName> servers, ServerName fn) {
255    for (ServerName server : servers) {
256      if (ServerName.isSameAddress(fn, server)) {
257        return server;
258      }
259    }
260    return null;
261  }
262
263  /*
264   * Assign the region to primary if its available. If both secondary and tertiary are available,
265   * assign to the host which has less load. Else assign to secondary or tertiary whichever is
266   * available (in that order).
267   */
268  private void assignRegionToAvailableFavoredNode(
269      Map<ServerName, List<RegionInfo>> assignmentMapForFavoredNodes, RegionInfo region,
270      ServerName primaryHost, ServerName secondaryHost, ServerName tertiaryHost) {
271
272    if (primaryHost != null) {
273      addRegionToMap(assignmentMapForFavoredNodes, region, primaryHost);
274
275    } else if (secondaryHost != null && tertiaryHost != null) {
276
277      // Assign the region to the one with a lower load (both have the desired hdfs blocks)
278      ServerName s;
279      ServerMetrics tertiaryLoad = super.services.getServerManager().getLoad(tertiaryHost);
280      ServerMetrics secondaryLoad = super.services.getServerManager().getLoad(secondaryHost);
281      if (secondaryLoad != null && tertiaryLoad != null) {
282        if (secondaryLoad.getRegionMetrics().size() < tertiaryLoad.getRegionMetrics().size()) {
283          s = secondaryHost;
284        } else {
285          s = tertiaryHost;
286        }
287      } else {
288        // We don't have one/more load, lets just choose a random node
289        s = RANDOM.nextBoolean() ? secondaryHost : tertiaryHost;
290      }
291      addRegionToMap(assignmentMapForFavoredNodes, region, s);
292    } else if (secondaryHost != null) {
293      addRegionToMap(assignmentMapForFavoredNodes, region, secondaryHost);
294    } else if (tertiaryHost != null) {
295      addRegionToMap(assignmentMapForFavoredNodes, region, tertiaryHost);
296    } else {
297      // No favored nodes are online, lets assign to BOGUS server
298      addRegionToMap(assignmentMapForFavoredNodes, region, BOGUS_SERVER_NAME);
299    }
300  }
301
302  /*
303   * If we have favored nodes for a region, we will return one of the FN as destination. If
304   * favored nodes are not present for a region, we will generate and return one of the FN as
305   * destination. If we can't generate anything, lets fallback.
306   */
307  @Override
308  public ServerName randomAssignment(RegionInfo regionInfo, List<ServerName> servers)
309      throws HBaseIOException {
310
311    if (servers != null && servers.contains(masterServerName)) {
312      if (shouldBeOnMaster(regionInfo)) {
313        metricsBalancer.incrMiscInvocations();
314        return masterServerName;
315      }
316      if (!LoadBalancer.isTablesOnMaster(getConf())) {
317        // Guarantee we do not put any regions on master
318        servers = new ArrayList<>(servers);
319        servers.remove(masterServerName);
320      }
321    }
322
323    ServerName destination = null;
324    if (!FavoredNodesManager.isFavoredNodeApplicable(regionInfo)) {
325      return super.randomAssignment(regionInfo, servers);
326    }
327
328    metricsBalancer.incrMiscInvocations();
329
330    List<ServerName> favoredNodes = fnm.getFavoredNodes(regionInfo);
331    if (favoredNodes == null || favoredNodes.isEmpty()) {
332      // Generate new favored nodes and return primary
333      FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, getConf());
334      helper.initialize();
335      try {
336        favoredNodes = helper.generateFavoredNodes(regionInfo);
337        updateFavoredNodesForRegion(regionInfo, favoredNodes);
338
339      } catch (IOException e) {
340        LOG.warn("Encountered exception while doing favored-nodes (random)assignment " + e);
341        throw new HBaseIOException(e);
342      }
343    }
344
345    List<ServerName> onlineServers = getOnlineFavoredNodes(servers, favoredNodes);
346    if (onlineServers.size() > 0) {
347      destination = onlineServers.get(RANDOM.nextInt(onlineServers.size()));
348    }
349
350    boolean alwaysAssign = getConf().getBoolean(FAVORED_ALWAYS_ASSIGN_REGIONS, true);
351    if (destination == null && alwaysAssign) {
352      LOG.warn("Can't generate FN for region: " + regionInfo + " falling back");
353      destination = super.randomAssignment(regionInfo, servers);
354    }
355    return destination;
356  }
357
358  private void updateFavoredNodesForRegion(RegionInfo regionInfo, List<ServerName> newFavoredNodes)
359      throws IOException {
360    Map<RegionInfo, List<ServerName>> regionFNMap = Maps.newHashMap();
361    regionFNMap.put(regionInfo, newFavoredNodes);
362    fnm.updateFavoredNodes(regionFNMap);
363  }
364
365  /*
366   * Reuse BaseLoadBalancer's retainAssignment, but generate favored nodes when its missing.
367   */
368  @Override
369  public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions,
370      List<ServerName> servers) throws HBaseIOException {
371
372    Map<ServerName, List<RegionInfo>> assignmentMap = Maps.newHashMap();
373    Map<ServerName, List<RegionInfo>> result = super.retainAssignment(regions, servers);
374    if (result == null || result.isEmpty()) {
375      LOG.warn("Nothing to assign to, probably no servers or no regions");
376      return null;
377    }
378
379    // Guarantee not to put other regions on master
380    if (servers != null && servers.contains(masterServerName)) {
381      servers = new ArrayList<>(servers);
382      servers.remove(masterServerName);
383    }
384
385    // Lets check if favored nodes info is in META, if not generate now.
386    FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, getConf());
387    helper.initialize();
388
389    LOG.debug("Generating favored nodes for regions missing them.");
390    Map<RegionInfo, List<ServerName>> regionFNMap = Maps.newHashMap();
391
392    try {
393      for (Entry<ServerName, List<RegionInfo>> entry : result.entrySet()) {
394
395        ServerName sn = entry.getKey();
396        ServerName primary = ServerName.valueOf(sn.getHostname(), sn.getPort(), NON_STARTCODE);
397
398        for (RegionInfo hri : entry.getValue()) {
399
400          if (FavoredNodesManager.isFavoredNodeApplicable(hri)) {
401            List<ServerName> favoredNodes = fnm.getFavoredNodes(hri);
402            if (favoredNodes == null || favoredNodes.size() < FAVORED_NODES_NUM) {
403
404              LOG.debug("Generating favored nodes for: " + hri + " with primary: " + primary);
405              ServerName[] secondaryAndTertiaryNodes = helper.getSecondaryAndTertiary(hri, primary);
406              if (secondaryAndTertiaryNodes != null && secondaryAndTertiaryNodes.length == 2) {
407                List<ServerName> newFavoredNodes = Lists.newArrayList();
408                newFavoredNodes.add(primary);
409                newFavoredNodes.add(ServerName.valueOf(secondaryAndTertiaryNodes[0].getHostname(),
410                    secondaryAndTertiaryNodes[0].getPort(), NON_STARTCODE));
411                newFavoredNodes.add(ServerName.valueOf(secondaryAndTertiaryNodes[1].getHostname(),
412                    secondaryAndTertiaryNodes[1].getPort(), NON_STARTCODE));
413                regionFNMap.put(hri, newFavoredNodes);
414                addRegionToMap(assignmentMap, hri, sn);
415
416              } else {
417                throw new HBaseIOException("Cannot generate secondary/tertiary FN for " + hri
418                  + " generated "
419                  + (secondaryAndTertiaryNodes != null ? secondaryAndTertiaryNodes : " nothing"));
420              }
421            } else {
422              List<ServerName> onlineFN = getOnlineFavoredNodes(servers, favoredNodes);
423              if (onlineFN.isEmpty()) {
424                // All favored nodes are dead, lets assign it to BOGUS
425                addRegionToMap(assignmentMap, hri, BOGUS_SERVER_NAME);
426              } else {
427                // Is primary not on FN? Less likely, but we can still take care of this.
428                if (FavoredNodesPlan.getFavoredServerPosition(favoredNodes, sn) != null) {
429                  addRegionToMap(assignmentMap, hri, sn);
430                } else {
431                  ServerName destination = onlineFN.get(RANDOM.nextInt(onlineFN.size()));
432                  LOG.warn("Region: " + hri + " not hosted on favored nodes: " + favoredNodes
433                    + " current: " + sn + " moving to: " + destination);
434                  addRegionToMap(assignmentMap, hri, destination);
435                }
436              }
437            }
438          } else {
439            addRegionToMap(assignmentMap, hri, sn);
440          }
441        }
442      }
443
444      if (!regionFNMap.isEmpty()) {
445        LOG.debug("Updating FN in meta for missing regions, count: " + regionFNMap.size());
446        fnm.updateFavoredNodes(regionFNMap);
447      }
448
449    } catch (IOException e) {
450      throw new HBaseIOException("Cannot generate/update FN for regions: " + regionFNMap.keySet());
451    }
452
453    return assignmentMap;
454  }
455
456  /*
457   * Return list of favored nodes that are online.
458   */
459  private List<ServerName> getOnlineFavoredNodes(List<ServerName> onlineServers,
460      List<ServerName> serversWithoutStartCodes) {
461    if (serversWithoutStartCodes == null) {
462      return null;
463    } else {
464      List<ServerName> result = Lists.newArrayList();
465      for (ServerName sn : serversWithoutStartCodes) {
466        for (ServerName online : onlineServers) {
467          if (ServerName.isSameAddress(sn, online)) {
468            result.add(online);
469          }
470        }
471      }
472      return result;
473    }
474  }
475
476  public synchronized List<ServerName> getFavoredNodes(RegionInfo regionInfo) {
477    return this.fnm.getFavoredNodes(regionInfo);
478  }
479
480  /*
481   * Generate Favored Nodes for daughters during region split.
482   *
483   * If the parent does not have FN, regenerates them for the daughters.
484   *
485   * If the parent has FN, inherit two FN from parent for each daughter and generate the remaining.
486   * The primary FN for both the daughters should be the same as parent. Inherit the secondary
487   * FN from the parent but keep it different for each daughter. Choose the remaining FN
488   * randomly. This would give us better distribution over a period of time after enough splits.
489   */
490  @Override
491  public void generateFavoredNodesForDaughter(List<ServerName> servers, RegionInfo parent,
492      RegionInfo regionA, RegionInfo regionB) throws IOException {
493
494    Map<RegionInfo, List<ServerName>> result = new HashMap<>();
495    FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, rackManager);
496    helper.initialize();
497
498    List<ServerName> parentFavoredNodes = fnm.getFavoredNodes(parent);
499    if (parentFavoredNodes == null) {
500      LOG.debug("Unable to find favored nodes for parent, " + parent
501          + " generating new favored nodes for daughter");
502      result.put(regionA, helper.generateFavoredNodes(regionA));
503      result.put(regionB, helper.generateFavoredNodes(regionB));
504
505    } else {
506
507      // Lets get the primary and secondary from parent for regionA
508      Set<ServerName> regionAFN =
509          getInheritedFNForDaughter(helper, parentFavoredNodes, PRIMARY, SECONDARY);
510      result.put(regionA, Lists.newArrayList(regionAFN));
511
512      // Lets get the primary and tertiary from parent for regionB
513      Set<ServerName> regionBFN =
514          getInheritedFNForDaughter(helper, parentFavoredNodes, PRIMARY, TERTIARY);
515      result.put(regionB, Lists.newArrayList(regionBFN));
516    }
517
518    fnm.updateFavoredNodes(result);
519  }
520
521  private Set<ServerName> getInheritedFNForDaughter(FavoredNodeAssignmentHelper helper,
522      List<ServerName> parentFavoredNodes, Position primary, Position secondary)
523      throws IOException {
524
525    Set<ServerName> daughterFN = Sets.newLinkedHashSet();
526    if (parentFavoredNodes.size() >= primary.ordinal()) {
527      daughterFN.add(parentFavoredNodes.get(primary.ordinal()));
528    }
529
530    if (parentFavoredNodes.size() >= secondary.ordinal()) {
531      daughterFN.add(parentFavoredNodes.get(secondary.ordinal()));
532    }
533
534    while (daughterFN.size() < FAVORED_NODES_NUM) {
535      ServerName newNode = helper.generateMissingFavoredNode(Lists.newArrayList(daughterFN));
536      daughterFN.add(newNode);
537    }
538    return daughterFN;
539  }
540
541  /*
542   * Generate favored nodes for a region during merge. Choose the FN from one of the sources to
543   * keep it simple.
544   */
545  @Override
546  public void generateFavoredNodesForMergedRegion(RegionInfo merged, RegionInfo [] mergeParents)
547      throws IOException {
548    updateFavoredNodesForRegion(merged, fnm.getFavoredNodes(mergeParents[0]));
549  }
550
551  /*
552   * Pick favored nodes with the highest locality for a region with lowest locality.
553   */
554  private class FavoredNodeLocalityPicker extends CandidateGenerator {
555
556    @Override
557    protected Cluster.Action generate(Cluster cluster) {
558
559      int thisServer = pickRandomServer(cluster);
560      int thisRegion;
561      if (thisServer == -1) {
562        LOG.trace("Could not pick lowest local region server");
563        return Cluster.NullAction;
564      } else {
565        // Pick lowest local region on this server
566        thisRegion = pickLowestLocalRegionOnServer(cluster, thisServer);
567      }
568      if (thisRegion == -1) {
569        if (cluster.regionsPerServer[thisServer].length > 0) {
570          LOG.trace("Could not pick lowest local region even when region server held "
571            + cluster.regionsPerServer[thisServer].length + " regions");
572        }
573        return Cluster.NullAction;
574      }
575
576      RegionInfo hri = cluster.regions[thisRegion];
577      List<ServerName> favoredNodes = fnm.getFavoredNodes(hri);
578      int otherServer;
579      if (favoredNodes == null) {
580        if (!FavoredNodesManager.isFavoredNodeApplicable(hri)) {
581          otherServer = pickOtherRandomServer(cluster, thisServer);
582        } else {
583          // No FN, ignore
584          LOG.trace("Ignoring, no favored nodes for region: " + hri);
585          return Cluster.NullAction;
586        }
587      } else {
588        // Pick other favored node with the highest locality
589        otherServer = getDifferentFavoredNode(cluster, favoredNodes, thisServer);
590      }
591      return getAction(thisServer, thisRegion, otherServer, -1);
592    }
593
594    private int getDifferentFavoredNode(Cluster cluster, List<ServerName> favoredNodes,
595        int currentServer) {
596      List<Integer> fnIndex = new ArrayList<>();
597      for (ServerName sn : favoredNodes) {
598        if (cluster.serversToIndex.containsKey(sn.getHostAndPort())) {
599          fnIndex.add(cluster.serversToIndex.get(sn.getHostAndPort()));
600        }
601      }
602      float locality = 0;
603      int highestLocalRSIndex = -1;
604      for (Integer index : fnIndex) {
605        if (index != currentServer) {
606          float temp = cluster.localityPerServer[index];
607          if (temp >= locality) {
608            locality = temp;
609            highestLocalRSIndex = index;
610          }
611        }
612      }
613      return highestLocalRSIndex;
614    }
615
616    private int pickLowestLocalRegionOnServer(Cluster cluster, int server) {
617      return cluster.getLowestLocalityRegionOnServer(server);
618    }
619  }
620
621  /*
622   * This is like LoadCandidateGenerator, but we choose appropriate FN for the region on the
623   * most loaded server.
624   */
625  class FavoredNodeLoadPicker extends CandidateGenerator {
626
627    @Override
628    Cluster.Action generate(Cluster cluster) {
629      cluster.sortServersByRegionCount();
630      int thisServer = pickMostLoadedServer(cluster);
631      int thisRegion = pickRandomRegion(cluster, thisServer, 0);
632      RegionInfo hri = cluster.regions[thisRegion];
633      int otherServer;
634      List<ServerName> favoredNodes = fnm.getFavoredNodes(hri);
635      if (favoredNodes == null) {
636        if (!FavoredNodesManager.isFavoredNodeApplicable(hri)) {
637          otherServer = pickLeastLoadedServer(cluster, thisServer);
638        } else {
639          return Cluster.NullAction;
640        }
641      } else {
642        otherServer = pickLeastLoadedFNServer(cluster, favoredNodes, thisServer);
643      }
644      return getAction(thisServer, thisRegion, otherServer, -1);
645    }
646
647    private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
648      Integer[] servers = cluster.serverIndicesSortedByRegionCount;
649      int index;
650      for (index = 0; index < servers.length ; index++) {
651        if ((servers[index] != null) && servers[index] != thisServer) {
652          break;
653        }
654      }
655      return servers[index];
656    }
657
658    private int pickLeastLoadedFNServer(final Cluster cluster, List<ServerName> favoredNodes,
659        int currentServerIndex) {
660      List<Integer> fnIndex = new ArrayList<>();
661      for (ServerName sn : favoredNodes) {
662        if (cluster.serversToIndex.containsKey(sn.getHostAndPort())) {
663          fnIndex.add(cluster.serversToIndex.get(sn.getHostAndPort()));
664        }
665      }
666      int leastLoadedFN = -1;
667      int load = Integer.MAX_VALUE;
668      for (Integer index : fnIndex) {
669        if (index != currentServerIndex) {
670          int temp = cluster.getNumRegions(index);
671          if (temp < load) {
672            load = temp;
673            leastLoadedFN = index;
674          }
675        }
676      }
677      return leastLoadedFN;
678    }
679
680    private int pickMostLoadedServer(final Cluster cluster) {
681      Integer[] servers = cluster.serverIndicesSortedByRegionCount;
682      int index;
683      for (index = servers.length - 1; index > 0 ; index--) {
684        if (servers[index] != null) {
685          break;
686        }
687      }
688      return servers[index];
689    }
690  }
691
692  /*
693   * For all regions correctly assigned to favored nodes, we just use the stochastic balancer
694   * implementation. For the misplaced regions, we assign a bogus server to it and AM takes care.
695   */
696  @Override
697  public synchronized List<RegionPlan> balanceCluster(Map<ServerName,
698      List<RegionInfo>> clusterState) {
699
700    if (this.services != null) {
701
702      List<RegionPlan> regionPlans = Lists.newArrayList();
703      Map<ServerName, List<RegionInfo>> correctAssignments = new HashMap<>();
704      int misplacedRegions = 0;
705
706      for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
707        ServerName current = entry.getKey();
708        List<RegionInfo> regions = Lists.newArrayList();
709        correctAssignments.put(current, regions);
710
711        for (RegionInfo hri : entry.getValue()) {
712          List<ServerName> favoredNodes = fnm.getFavoredNodes(hri);
713          if (FavoredNodesPlan.getFavoredServerPosition(favoredNodes, current) != null ||
714              !FavoredNodesManager.isFavoredNodeApplicable(hri)) {
715            regions.add(hri);
716
717          } else {
718            // No favored nodes, lets unassign.
719            LOG.warn("Region not on favored nodes, unassign. Region: " + hri
720              + " current: " + current + " favored nodes: " + favoredNodes);
721            try {
722              this.services.getAssignmentManager().unassign(hri);
723            } catch (IOException e) {
724              LOG.warn("Failed unassign", e);
725              continue;
726            }
727            RegionPlan rp = new RegionPlan(hri, null, null);
728            regionPlans.add(rp);
729            misplacedRegions++;
730          }
731        }
732      }
733      LOG.debug("Found misplaced regions: " + misplacedRegions + ", not on favored nodes.");
734      List<RegionPlan> regionPlansFromBalance = super.balanceCluster(correctAssignments);
735      if (regionPlansFromBalance != null) {
736        regionPlans.addAll(regionPlansFromBalance);
737      }
738      return regionPlans;
739    } else {
740      return super.balanceCluster(clusterState);
741    }
742  }
743}
744