001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.favored;
019
020import static org.apache.hadoop.hbase.ServerName.NON_STARTCODE;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Map.Entry;
029import java.util.Set;
030import java.util.concurrent.ThreadLocalRandom;
031import org.apache.commons.lang3.StringUtils;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.CatalogFamilyFormat;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellBuilderFactory;
036import org.apache.hadoop.hbase.CellBuilderType;
037import org.apache.hadoop.hbase.HBaseIOException;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.ConnectionFactory;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.master.RackManager;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.hadoop.hdfs.DFSConfigKeys;
050import org.apache.hadoop.hdfs.HdfsConfiguration;
051import org.apache.hadoop.net.NetUtils;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
057import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
058import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
059
060import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FavoredNodes;
063
064/**
065 * Helper class for FavoredNodeLoadBalancer that has all the intelligence for racks, meta scans,
066 * etc. Instantiated by the FavoredNodeLoadBalancer when needed (from within calls like
067 * FavoredNodeLoadBalancer#randomAssignment(RegionInfo, List). All updates to favored nodes should
068 * only be done from FavoredNodesManager and not through this helper class (except for tests).
069 */
070@InterfaceAudience.Private
071public class FavoredNodeAssignmentHelper {
072  private static final Logger LOG = LoggerFactory.getLogger(FavoredNodeAssignmentHelper.class);
073  private RackManager rackManager;
074  private Map<String, List<ServerName>> rackToRegionServerMap;
075  private List<String> uniqueRackList;
076  // This map serves as a cache for rack to sn lookups. The num of
077  // region server entries might not match with that is in servers.
078  private Map<String, String> regionServerToRackMap;
079  private List<ServerName> servers;
080  public static final byte[] FAVOREDNODES_QUALIFIER = Bytes.toBytes("fn");
081  public final static short FAVORED_NODES_NUM = 3;
082  public final static short MAX_ATTEMPTS_FN_GENERATION = 10;
083
084  public FavoredNodeAssignmentHelper(final List<ServerName> servers, Configuration conf) {
085    this(servers, new RackManager(conf));
086  }
087
088  public FavoredNodeAssignmentHelper(final List<ServerName> servers,
089    final RackManager rackManager) {
090    this.servers = servers;
091    this.rackManager = rackManager;
092    this.rackToRegionServerMap = new HashMap<>();
093    this.regionServerToRackMap = new HashMap<>();
094    this.uniqueRackList = new ArrayList<>();
095  }
096
097  // Always initialize() when FavoredNodeAssignmentHelper is constructed.
098  public void initialize() {
099    for (ServerName sn : this.servers) {
100      String rackName = getRackOfServer(sn);
101      List<ServerName> serverList = this.rackToRegionServerMap.get(rackName);
102      if (serverList == null) {
103        serverList = Lists.newArrayList();
104        // Add the current rack to the unique rack list
105        this.uniqueRackList.add(rackName);
106        this.rackToRegionServerMap.put(rackName, serverList);
107      }
108      for (ServerName serverName : serverList) {
109        if (ServerName.isSameAddress(sn, serverName)) {
110          // The server is already present, ignore.
111          break;
112        }
113      }
114      serverList.add((sn));
115      this.regionServerToRackMap.put(sn.getHostname(), rackName);
116    }
117  }
118
119  /**
120   * Update meta table with favored nodes info
121   * @param regionToFavoredNodes map of RegionInfo's to their favored nodes
122   * @param connection           connection to be used
123   */
124  public static void updateMetaWithFavoredNodesInfo(
125    Map<RegionInfo, List<ServerName>> regionToFavoredNodes, Connection connection)
126    throws IOException {
127    List<Put> puts = new ArrayList<>();
128    for (Map.Entry<RegionInfo, List<ServerName>> entry : regionToFavoredNodes.entrySet()) {
129      Put put = makePut(entry.getKey(), entry.getValue());
130      if (put != null) {
131        puts.add(put);
132      }
133    }
134    try (Table table = connection.getTable(TableName.META_TABLE_NAME)) {
135      table.put(puts);
136    }
137    LOG.info("Added " + puts.size() + " region favored nodes in META");
138  }
139
140  /**
141   * Update meta table with favored nodes info
142   */
143  public static void updateMetaWithFavoredNodesInfo(
144    Map<RegionInfo, List<ServerName>> regionToFavoredNodes, Configuration conf) throws IOException {
145    // Write the region assignments to the meta table.
146    // TODO: See above overrides take a Connection rather than a Configuration only the
147    // Connection is a short circuit connection. That is not going to good in all cases, when
148    // master and meta are not colocated. Fix when this favored nodes feature is actually used
149    // someday.
150    try (Connection conn = ConnectionFactory.createConnection(conf)) {
151      updateMetaWithFavoredNodesInfo(regionToFavoredNodes, conn);
152    }
153  }
154
155  private static Put makePut(RegionInfo regionInfo, List<ServerName> favoredNodeList)
156    throws IOException {
157    if (CollectionUtils.isEmpty(favoredNodeList)) {
158      return null;
159    }
160    long time = EnvironmentEdgeManager.currentTime();
161    Put put = new Put(CatalogFamilyFormat.getMetaKeyForRegion(regionInfo), time);
162    byte[] favoredNodes = getFavoredNodes(favoredNodeList);
163    put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
164      .setFamily(HConstants.CATALOG_FAMILY).setQualifier(FAVOREDNODES_QUALIFIER).setTimestamp(time)
165      .setType(Cell.Type.Put).setValue(favoredNodes).build());
166    LOG.debug("Create the region {} with favored nodes {}", regionInfo.getRegionNameAsString(),
167      favoredNodeList);
168    return put;
169  }
170
171  /**
172   * Convert PB bytes to ServerName.
173   * @param favoredNodes The PB'ed bytes of favored nodes
174   * @return the array of {@link ServerName} for the byte array of favored nodes.
175   */
176  public static ServerName[] getFavoredNodesList(byte[] favoredNodes) throws IOException {
177    FavoredNodes f = FavoredNodes.parseFrom(favoredNodes);
178    List<HBaseProtos.ServerName> protoNodes = f.getFavoredNodeList();
179    ServerName[] servers = new ServerName[protoNodes.size()];
180    int i = 0;
181    for (HBaseProtos.ServerName node : protoNodes) {
182      servers[i++] = ProtobufUtil.toServerName(node);
183    }
184    return servers;
185  }
186
187  /**
188   * n * @return PB'ed bytes of {@link FavoredNodes} generated by the server list.
189   */
190  public static byte[] getFavoredNodes(List<ServerName> serverAddrList) {
191    FavoredNodes.Builder f = FavoredNodes.newBuilder();
192    for (ServerName s : serverAddrList) {
193      HBaseProtos.ServerName.Builder b = HBaseProtos.ServerName.newBuilder();
194      b.setHostName(s.getHostname());
195      b.setPort(s.getPort());
196      b.setStartCode(ServerName.NON_STARTCODE);
197      f.addFavoredNode(b.build());
198    }
199    return f.build().toByteArray();
200  }
201
202  // Place the regions round-robin across the racks picking one server from each
203  // rack at a time. Start with a random rack, and a random server from every rack.
204  // If a rack doesn't have enough servers it will go to the next rack and so on.
205  // for choosing a primary.
206  // For example, if 4 racks (r1 .. r4) with 8 servers (s1..s8) each, one possible
207  // placement could be r2:s5, r3:s5, r4:s5, r1:s5, r2:s6, r3:s6..
208  // If there were fewer servers in one rack, say r3, which had 3 servers, one possible
209  // placement could be r2:s5, <skip-r3>, r4:s5, r1:s5, r2:s6, <skip-r3> ...
210  // The regions should be distributed proportionately to the racksizes
211  public void placePrimaryRSAsRoundRobin(Map<ServerName, List<RegionInfo>> assignmentMap,
212    Map<RegionInfo, ServerName> primaryRSMap, List<RegionInfo> regions) {
213    List<String> rackList = new ArrayList<>(rackToRegionServerMap.size());
214    rackList.addAll(rackToRegionServerMap.keySet());
215    int rackIndex = ThreadLocalRandom.current().nextInt(rackList.size());
216    int maxRackSize = 0;
217    for (Map.Entry<String, List<ServerName>> r : rackToRegionServerMap.entrySet()) {
218      if (r.getValue().size() > maxRackSize) {
219        maxRackSize = r.getValue().size();
220      }
221    }
222    int numIterations = 0;
223    // Initialize the current processing host index.
224    int serverIndex = ThreadLocalRandom.current().nextInt(maxRackSize);
225    for (RegionInfo regionInfo : regions) {
226      List<ServerName> currentServerList;
227      String rackName;
228      while (true) {
229        rackName = rackList.get(rackIndex);
230        numIterations++;
231        // Get the server list for the current rack
232        currentServerList = rackToRegionServerMap.get(rackName);
233
234        if (serverIndex >= currentServerList.size()) { // not enough machines in this rack
235          if (numIterations % rackList.size() == 0) {
236            if (++serverIndex >= maxRackSize) serverIndex = 0;
237          }
238          if ((++rackIndex) >= rackList.size()) {
239            rackIndex = 0; // reset the rack index to 0
240          }
241        } else break;
242      }
243
244      // Get the current process region server
245      ServerName currentServer = currentServerList.get(serverIndex);
246
247      // Place the current region with the current primary region server
248      primaryRSMap.put(regionInfo, currentServer);
249      if (assignmentMap != null) {
250        List<RegionInfo> regionsForServer = assignmentMap.get(currentServer);
251        if (regionsForServer == null) {
252          regionsForServer = new ArrayList<>();
253          assignmentMap.put(currentServer, regionsForServer);
254        }
255        regionsForServer.add(regionInfo);
256      }
257
258      // Set the next processing index
259      if (numIterations % rackList.size() == 0) {
260        ++serverIndex;
261      }
262      if ((++rackIndex) >= rackList.size()) {
263        rackIndex = 0; // reset the rack index to 0
264      }
265    }
266  }
267
268  public Map<RegionInfo, ServerName[]>
269    placeSecondaryAndTertiaryRS(Map<RegionInfo, ServerName> primaryRSMap) {
270    Map<RegionInfo, ServerName[]> secondaryAndTertiaryMap = new HashMap<>();
271    for (Map.Entry<RegionInfo, ServerName> entry : primaryRSMap.entrySet()) {
272      // Get the target region and its primary region server rack
273      RegionInfo regionInfo = entry.getKey();
274      ServerName primaryRS = entry.getValue();
275      try {
276        // Create the secondary and tertiary region server pair object.
277        ServerName[] favoredNodes = getSecondaryAndTertiary(regionInfo, primaryRS);
278        if (favoredNodes != null) {
279          secondaryAndTertiaryMap.put(regionInfo, favoredNodes);
280          LOG.debug("Place the secondary and tertiary region server for region "
281            + regionInfo.getRegionNameAsString());
282        }
283      } catch (Exception e) {
284        LOG.warn("Cannot place the favored nodes for region " + regionInfo.getRegionNameAsString()
285          + " because " + e, e);
286        continue;
287      }
288    }
289    return secondaryAndTertiaryMap;
290  }
291
292  public ServerName[] getSecondaryAndTertiary(RegionInfo regionInfo, ServerName primaryRS)
293    throws IOException {
294
295    ServerName[] favoredNodes;// Get the rack for the primary region server
296    String primaryRack = getRackOfServer(primaryRS);
297
298    if (getTotalNumberOfRacks() == 1) {
299      favoredNodes = singleRackCase(regionInfo, primaryRS, primaryRack);
300    } else {
301      favoredNodes = multiRackCase(regionInfo, primaryRS, primaryRack);
302    }
303    return favoredNodes;
304  }
305
306  private Map<ServerName, Set<RegionInfo>>
307    mapRSToPrimaries(Map<RegionInfo, ServerName> primaryRSMap) {
308    Map<ServerName, Set<RegionInfo>> primaryServerMap = new HashMap<>();
309    for (Entry<RegionInfo, ServerName> e : primaryRSMap.entrySet()) {
310      Set<RegionInfo> currentSet = primaryServerMap.get(e.getValue());
311      if (currentSet == null) {
312        currentSet = new HashSet<>();
313      }
314      currentSet.add(e.getKey());
315      primaryServerMap.put(e.getValue(), currentSet);
316    }
317    return primaryServerMap;
318  }
319
320  /**
321   * For regions that share the primary, avoid placing the secondary and tertiary on a same RS. Used
322   * for generating new assignments for the primary/secondary/tertiary RegionServers n * @return the
323   * map of regions to the servers the region-files should be hosted on
324   */
325  public Map<RegionInfo, ServerName[]>
326    placeSecondaryAndTertiaryWithRestrictions(Map<RegionInfo, ServerName> primaryRSMap) {
327    Map<ServerName, Set<RegionInfo>> serverToPrimaries = mapRSToPrimaries(primaryRSMap);
328    Map<RegionInfo, ServerName[]> secondaryAndTertiaryMap = new HashMap<>();
329
330    for (Entry<RegionInfo, ServerName> entry : primaryRSMap.entrySet()) {
331      // Get the target region and its primary region server rack
332      RegionInfo regionInfo = entry.getKey();
333      ServerName primaryRS = entry.getValue();
334      try {
335        // Get the rack for the primary region server
336        String primaryRack = getRackOfServer(primaryRS);
337        ServerName[] favoredNodes = null;
338        if (getTotalNumberOfRacks() == 1) {
339          // Single rack case: have to pick the secondary and tertiary
340          // from the same rack
341          favoredNodes = singleRackCase(regionInfo, primaryRS, primaryRack);
342        } else {
343          favoredNodes = multiRackCaseWithRestrictions(serverToPrimaries, secondaryAndTertiaryMap,
344            primaryRack, primaryRS, regionInfo);
345        }
346        if (favoredNodes != null) {
347          secondaryAndTertiaryMap.put(regionInfo, favoredNodes);
348          LOG.debug("Place the secondary and tertiary region server for region "
349            + regionInfo.getRegionNameAsString());
350        }
351      } catch (Exception e) {
352        LOG.warn("Cannot place the favored nodes for region " + regionInfo.getRegionNameAsString()
353          + " because " + e, e);
354        continue;
355      }
356    }
357    return secondaryAndTertiaryMap;
358  }
359
360  private ServerName[] multiRackCaseWithRestrictions(
361    Map<ServerName, Set<RegionInfo>> serverToPrimaries,
362    Map<RegionInfo, ServerName[]> secondaryAndTertiaryMap, String primaryRack, ServerName primaryRS,
363    RegionInfo regionInfo) throws IOException {
364    // Random to choose the secondary and tertiary region server
365    // from another rack to place the secondary and tertiary
366    // Random to choose one rack except for the current rack
367    Set<String> rackSkipSet = new HashSet<>();
368    rackSkipSet.add(primaryRack);
369    String secondaryRack = getOneRandomRack(rackSkipSet);
370    List<ServerName> serverList = getServersFromRack(secondaryRack);
371    Set<ServerName> serverSet = new HashSet<>(serverList);
372    ServerName[] favoredNodes;
373    if (serverList.size() >= 2) {
374      // Randomly pick up two servers from this secondary rack
375      // Skip the secondary for the tertiary placement
376      // skip the servers which share the primary already
377      Set<RegionInfo> primaries = serverToPrimaries.get(primaryRS);
378      Set<ServerName> skipServerSet = new HashSet<>();
379      while (true) {
380        ServerName[] secondaryAndTertiary = null;
381        if (primaries.size() > 1) {
382          // check where his tertiary and secondary are
383          for (RegionInfo primary : primaries) {
384            secondaryAndTertiary = secondaryAndTertiaryMap.get(primary);
385            if (secondaryAndTertiary != null) {
386              if (getRackOfServer(secondaryAndTertiary[0]).equals(secondaryRack)) {
387                skipServerSet.add(secondaryAndTertiary[0]);
388              }
389              if (getRackOfServer(secondaryAndTertiary[1]).equals(secondaryRack)) {
390                skipServerSet.add(secondaryAndTertiary[1]);
391              }
392            }
393          }
394        }
395        if (skipServerSet.size() + 2 <= serverSet.size()) break;
396        skipServerSet.clear();
397        rackSkipSet.add(secondaryRack);
398        // we used all racks
399        if (rackSkipSet.size() == getTotalNumberOfRacks()) {
400          // remove the last two added and break
401          skipServerSet.remove(secondaryAndTertiary[0]);
402          skipServerSet.remove(secondaryAndTertiary[1]);
403          break;
404        }
405        secondaryRack = getOneRandomRack(rackSkipSet);
406        serverList = getServersFromRack(secondaryRack);
407        serverSet = new HashSet<>(serverList);
408      }
409
410      // Place the secondary RS
411      ServerName secondaryRS = getOneRandomServer(secondaryRack, skipServerSet);
412      skipServerSet.add(secondaryRS);
413      // Place the tertiary RS
414      ServerName tertiaryRS = getOneRandomServer(secondaryRack, skipServerSet);
415
416      if (secondaryRS == null || tertiaryRS == null) {
417        LOG.error("Cannot place the secondary and tertiary" + " region server for region "
418          + regionInfo.getRegionNameAsString());
419      }
420      // Create the secondary and tertiary pair
421      favoredNodes = new ServerName[2];
422      favoredNodes[0] = secondaryRS;
423      favoredNodes[1] = tertiaryRS;
424    } else {
425      // Pick the secondary rs from this secondary rack
426      // and pick the tertiary from another random rack
427      favoredNodes = new ServerName[2];
428      ServerName secondary = getOneRandomServer(secondaryRack);
429      favoredNodes[0] = secondary;
430
431      // Pick the tertiary
432      if (getTotalNumberOfRacks() == 2) {
433        // Pick the tertiary from the same rack of the primary RS
434        Set<ServerName> serverSkipSet = new HashSet<>();
435        serverSkipSet.add(primaryRS);
436        favoredNodes[1] = getOneRandomServer(primaryRack, serverSkipSet);
437      } else {
438        // Pick the tertiary from another rack
439        rackSkipSet.add(secondaryRack);
440        String tertiaryRandomRack = getOneRandomRack(rackSkipSet);
441        favoredNodes[1] = getOneRandomServer(tertiaryRandomRack);
442      }
443    }
444    return favoredNodes;
445  }
446
447  private ServerName[] singleRackCase(RegionInfo regionInfo, ServerName primaryRS,
448    String primaryRack) throws IOException {
449    // Single rack case: have to pick the secondary and tertiary
450    // from the same rack
451    List<ServerName> serverList = getServersFromRack(primaryRack);
452    if ((serverList == null) || (serverList.size() <= 2)) {
453      // Single region server case: cannot not place the favored nodes
454      // on any server;
455      return null;
456    } else {
457      // Randomly select two region servers from the server list and make sure
458      // they are not overlap with the primary region server;
459      Set<ServerName> serverSkipSet = new HashSet<>();
460      serverSkipSet.add(primaryRS);
461
462      // Place the secondary RS
463      ServerName secondaryRS = getOneRandomServer(primaryRack, serverSkipSet);
464      // Skip the secondary for the tertiary placement
465      serverSkipSet.add(secondaryRS);
466      ServerName tertiaryRS = getOneRandomServer(primaryRack, serverSkipSet);
467
468      if (secondaryRS == null || tertiaryRS == null) {
469        LOG.error("Cannot place the secondary, tertiary favored node for region "
470          + regionInfo.getRegionNameAsString());
471      }
472      // Create the secondary and tertiary pair
473      ServerName[] favoredNodes = new ServerName[2];
474      favoredNodes[0] = secondaryRS;
475      favoredNodes[1] = tertiaryRS;
476      return favoredNodes;
477    }
478  }
479
480  /**
481   * Place secondary and tertiary nodes in a multi rack case. If there are only two racks, then we
482   * try the place the secondary and tertiary on different rack than primary. But if the other rack
483   * has only one region server, then we place primary and tertiary on one rack and secondary on
484   * another. The aim is two distribute the three favored nodes on >= 2 racks. TODO: see how we can
485   * use generateMissingFavoredNodeMultiRack API here
486   * @param regionInfo  Region for which we are trying to generate FN
487   * @param primaryRS   The primary favored node.
488   * @param primaryRack The rack of the primary favored node.
489   * @return Array containing secondary and tertiary favored nodes.
490   * @throws IOException Signals that an I/O exception has occurred.
491   */
492  private ServerName[] multiRackCase(RegionInfo regionInfo, ServerName primaryRS,
493    String primaryRack) throws IOException {
494
495    List<ServerName> favoredNodes = Lists.newArrayList(primaryRS);
496    // Create the secondary and tertiary pair
497    ServerName secondaryRS = generateMissingFavoredNodeMultiRack(favoredNodes);
498    favoredNodes.add(secondaryRS);
499    String secondaryRack = getRackOfServer(secondaryRS);
500
501    ServerName tertiaryRS;
502    if (primaryRack.equals(secondaryRack)) {
503      tertiaryRS = generateMissingFavoredNode(favoredNodes);
504    } else {
505      // Try to place tertiary in secondary RS rack else place on primary rack.
506      tertiaryRS = getOneRandomServer(secondaryRack, Sets.newHashSet(secondaryRS));
507      if (tertiaryRS == null) {
508        tertiaryRS = getOneRandomServer(primaryRack, Sets.newHashSet(primaryRS));
509      }
510      // We couldn't find anything in secondary rack, get any FN
511      if (tertiaryRS == null) {
512        tertiaryRS = generateMissingFavoredNode(Lists.newArrayList(primaryRS, secondaryRS));
513      }
514    }
515    return new ServerName[] { secondaryRS, tertiaryRS };
516  }
517
518  public boolean canPlaceFavoredNodes() {
519    return (this.servers.size() >= FAVORED_NODES_NUM);
520  }
521
522  private int getTotalNumberOfRacks() {
523    return this.uniqueRackList.size();
524  }
525
526  private List<ServerName> getServersFromRack(String rack) {
527    return this.rackToRegionServerMap.get(rack);
528  }
529
530  /**
531   * Gets a random server from the specified rack and skips anything specified.
532   * @param rack          rack from a server is needed
533   * @param skipServerSet the server shouldn't belong to this set
534   */
535  protected ServerName getOneRandomServer(String rack, Set<ServerName> skipServerSet) {
536
537    // Is the rack valid? Do we recognize it?
538    if (rack == null || getServersFromRack(rack) == null || getServersFromRack(rack).isEmpty()) {
539      return null;
540    }
541
542    // Lets use a set so we can eliminate duplicates
543    Set<StartcodeAgnosticServerName> serversToChooseFrom = Sets.newHashSet();
544    for (ServerName sn : getServersFromRack(rack)) {
545      serversToChooseFrom.add(StartcodeAgnosticServerName.valueOf(sn));
546    }
547
548    if (skipServerSet != null && skipServerSet.size() > 0) {
549      for (ServerName sn : skipServerSet) {
550        serversToChooseFrom.remove(StartcodeAgnosticServerName.valueOf(sn));
551      }
552      // Do we have any servers left to choose from?
553      if (serversToChooseFrom.isEmpty()) {
554        return null;
555      }
556    }
557
558    ServerName randomServer = null;
559    int randomIndex = ThreadLocalRandom.current().nextInt(serversToChooseFrom.size());
560    int j = 0;
561    for (StartcodeAgnosticServerName sn : serversToChooseFrom) {
562      if (j == randomIndex) {
563        randomServer = sn;
564        break;
565      }
566      j++;
567    }
568
569    if (randomServer != null) {
570      return ServerName.valueOf(randomServer.getAddress(), randomServer.getStartcode());
571    } else {
572      return null;
573    }
574  }
575
576  private ServerName getOneRandomServer(String rack) throws IOException {
577    return this.getOneRandomServer(rack, null);
578  }
579
580  String getOneRandomRack(Set<String> skipRackSet) throws IOException {
581    if (skipRackSet == null || uniqueRackList.size() <= skipRackSet.size()) {
582      throw new IOException("Cannot randomly pick another random server");
583    }
584
585    String randomRack;
586    do {
587      int randomIndex = ThreadLocalRandom.current().nextInt(this.uniqueRackList.size());
588      randomRack = this.uniqueRackList.get(randomIndex);
589    } while (skipRackSet.contains(randomRack));
590
591    return randomRack;
592  }
593
594  public static String getFavoredNodesAsString(List<ServerName> nodes) {
595    StringBuilder strBuf = new StringBuilder();
596    int i = 0;
597    for (ServerName node : nodes) {
598      strBuf.append(node.getAddress());
599      if (++i != nodes.size()) strBuf.append(";");
600    }
601    return strBuf.toString();
602  }
603
604  /*
605   * Generates a missing favored node based on the input favored nodes. This helps to generate new
606   * FN when there is already 2 FN and we need a third one. For eg, while generating new FN for
607   * split daughters after inheriting 2 FN from the parent. If the cluster has only one rack it
608   * generates from the same rack. If the cluster has multiple racks, then it ensures the new FN
609   * respects the rack constraints similar to HDFS. For eg: if there are 3 FN, they will be spread
610   * across 2 racks.
611   */
612  public ServerName generateMissingFavoredNode(List<ServerName> favoredNodes) throws IOException {
613    if (this.uniqueRackList.size() == 1) {
614      return generateMissingFavoredNodeSingleRack(favoredNodes, null);
615    } else {
616      return generateMissingFavoredNodeMultiRack(favoredNodes, null);
617    }
618  }
619
620  public ServerName generateMissingFavoredNode(List<ServerName> favoredNodes,
621    List<ServerName> excludeNodes) throws IOException {
622    if (this.uniqueRackList.size() == 1) {
623      return generateMissingFavoredNodeSingleRack(favoredNodes, excludeNodes);
624    } else {
625      return generateMissingFavoredNodeMultiRack(favoredNodes, excludeNodes);
626    }
627  }
628
629  /*
630   * Generate FN for a single rack scenario, don't generate from one of the excluded nodes. Helps
631   * when we would like to find a replacement node.
632   */
633  private ServerName generateMissingFavoredNodeSingleRack(List<ServerName> favoredNodes,
634    List<ServerName> excludeNodes) throws IOException {
635    ServerName newServer = null;
636    Set<ServerName> excludeFNSet = Sets.newHashSet(favoredNodes);
637    if (excludeNodes != null && excludeNodes.size() > 0) {
638      excludeFNSet.addAll(excludeNodes);
639    }
640    if (favoredNodes.size() < FAVORED_NODES_NUM) {
641      newServer = this.getOneRandomServer(this.uniqueRackList.get(0), excludeFNSet);
642    }
643    return newServer;
644  }
645
646  private ServerName generateMissingFavoredNodeMultiRack(List<ServerName> favoredNodes)
647    throws IOException {
648    return generateMissingFavoredNodeMultiRack(favoredNodes, null);
649  }
650
651  /*
652   * Generates a missing FN based on the input favoredNodes and also the nodes to be skipped. Get
653   * the current layout of favored nodes arrangement and nodes to be excluded and get a random node
654   * that goes with HDFS block placement. Eg: If the existing nodes are on one rack, generate one
655   * from another rack. We exclude as much as possible so the random selection has more chance to
656   * generate a node within a few iterations, ideally 1.
657   */
658  private ServerName generateMissingFavoredNodeMultiRack(List<ServerName> favoredNodes,
659    List<ServerName> excludeNodes) throws IOException {
660
661    Set<String> racks = Sets.newHashSet();
662    Map<String, Set<ServerName>> rackToFNMapping = new HashMap<>();
663
664    // Lets understand the current rack distribution of the FN
665    for (ServerName sn : favoredNodes) {
666      String rack = getRackOfServer(sn);
667      racks.add(rack);
668
669      Set<ServerName> serversInRack = rackToFNMapping.get(rack);
670      if (serversInRack == null) {
671        serversInRack = Sets.newHashSet();
672        rackToFNMapping.put(rack, serversInRack);
673      }
674      serversInRack.add(sn);
675    }
676
677    // What racks should be skipped while getting a FN?
678    Set<String> skipRackSet = Sets.newHashSet();
679
680    /*
681     * If both the FN are from the same rack, then we don't want to generate another FN on the same
682     * rack. If that rack fails, the region would be unavailable.
683     */
684    if (racks.size() == 1 && favoredNodes.size() > 1) {
685      skipRackSet.add(racks.iterator().next());
686    }
687
688    /*
689     * If there are no free nodes on the existing racks, we should skip those racks too. We can
690     * reduce the number of iterations for FN selection.
691     */
692    for (String rack : racks) {
693      if (
694        getServersFromRack(rack) != null
695          && rackToFNMapping.get(rack).size() == getServersFromRack(rack).size()
696      ) {
697        skipRackSet.add(rack);
698      }
699    }
700
701    Set<ServerName> favoredNodeSet = Sets.newHashSet(favoredNodes);
702    if (excludeNodes != null && excludeNodes.size() > 0) {
703      favoredNodeSet.addAll(excludeNodes);
704    }
705
706    /*
707     * Lets get a random rack by excluding skipRackSet and generate a random FN from that rack.
708     */
709    int i = 0;
710    Set<String> randomRacks = Sets.newHashSet();
711    ServerName newServer = null;
712    do {
713      String randomRack = this.getOneRandomRack(skipRackSet);
714      newServer = this.getOneRandomServer(randomRack, favoredNodeSet);
715      randomRacks.add(randomRack);
716      i++;
717    } while ((i < MAX_ATTEMPTS_FN_GENERATION) && (newServer == null));
718
719    if (newServer == null) {
720      if (LOG.isTraceEnabled()) {
721        LOG.trace(String.format(
722          "Unable to generate additional favored nodes for %s after "
723            + "considering racks %s and skip rack %s with a unique rack list of %s and rack "
724            + "to RS map of %s and RS to rack map of %s",
725          StringUtils.join(favoredNodes, ","), randomRacks, skipRackSet, uniqueRackList,
726          rackToRegionServerMap, regionServerToRackMap));
727      }
728      throw new IOException(
729        " Unable to generate additional favored nodes for " + StringUtils.join(favoredNodes, ","));
730    }
731    return newServer;
732  }
733
734  /*
735   * Generate favored nodes for a region. Choose a random server as primary and then choose
736   * secondary and tertiary FN so its spread across two racks.
737   */
738  public List<ServerName> generateFavoredNodes(RegionInfo hri) throws IOException {
739
740    List<ServerName> favoredNodesForRegion = new ArrayList<>(FAVORED_NODES_NUM);
741    ServerName primary = servers.get(ThreadLocalRandom.current().nextInt(servers.size()));
742    favoredNodesForRegion.add(ServerName.valueOf(primary.getAddress(), ServerName.NON_STARTCODE));
743
744    Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>(1);
745    primaryRSMap.put(hri, primary);
746    Map<RegionInfo, ServerName[]> secondaryAndTertiaryRSMap =
747      placeSecondaryAndTertiaryRS(primaryRSMap);
748    ServerName[] secondaryAndTertiaryNodes = secondaryAndTertiaryRSMap.get(hri);
749    if (secondaryAndTertiaryNodes != null && secondaryAndTertiaryNodes.length == 2) {
750      for (ServerName sn : secondaryAndTertiaryNodes) {
751        favoredNodesForRegion.add(ServerName.valueOf(sn.getAddress(), ServerName.NON_STARTCODE));
752      }
753      return favoredNodesForRegion;
754    } else {
755      throw new HBaseIOException("Unable to generate secondary and tertiary favored nodes.");
756    }
757  }
758
759  public Map<RegionInfo, List<ServerName>> generateFavoredNodesRoundRobin(
760    Map<ServerName, List<RegionInfo>> assignmentMap, List<RegionInfo> regions) throws IOException {
761
762    if (regions.size() > 0) {
763      if (canPlaceFavoredNodes()) {
764        Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>();
765        // Lets try to have an equal distribution for primary favored node
766        placePrimaryRSAsRoundRobin(assignmentMap, primaryRSMap, regions);
767        return generateFavoredNodes(primaryRSMap);
768
769      } else {
770        throw new HBaseIOException("Not enough nodes to generate favored nodes");
771      }
772    }
773    return null;
774  }
775
776  /*
777   * Generate favored nodes for a set of regions when we know where they are currently hosted.
778   */
779  private Map<RegionInfo, List<ServerName>>
780    generateFavoredNodes(Map<RegionInfo, ServerName> primaryRSMap) {
781
782    Map<RegionInfo, List<ServerName>> generatedFavNodes = new HashMap<>();
783    Map<RegionInfo, ServerName[]> secondaryAndTertiaryRSMap =
784      placeSecondaryAndTertiaryRS(primaryRSMap);
785
786    for (Entry<RegionInfo, ServerName> entry : primaryRSMap.entrySet()) {
787      List<ServerName> favoredNodesForRegion = new ArrayList<>(FAVORED_NODES_NUM);
788      RegionInfo region = entry.getKey();
789      ServerName primarySN = entry.getValue();
790      favoredNodesForRegion
791        .add(ServerName.valueOf(primarySN.getHostname(), primarySN.getPort(), NON_STARTCODE));
792      ServerName[] secondaryAndTertiaryNodes = secondaryAndTertiaryRSMap.get(region);
793      if (secondaryAndTertiaryNodes != null) {
794        favoredNodesForRegion.add(ServerName.valueOf(secondaryAndTertiaryNodes[0].getHostname(),
795          secondaryAndTertiaryNodes[0].getPort(), NON_STARTCODE));
796        favoredNodesForRegion.add(ServerName.valueOf(secondaryAndTertiaryNodes[1].getHostname(),
797          secondaryAndTertiaryNodes[1].getPort(), NON_STARTCODE));
798      }
799      generatedFavNodes.put(region, favoredNodesForRegion);
800    }
801    return generatedFavNodes;
802  }
803
804  /**
805   * Get the rack of server from local mapping when present, saves lookup by the RackManager.
806   */
807  private String getRackOfServer(ServerName sn) {
808    if (this.regionServerToRackMap.containsKey(sn.getHostname())) {
809      return this.regionServerToRackMap.get(sn.getHostname());
810    } else {
811      String rack = this.rackManager.getRack(sn);
812      this.regionServerToRackMap.put(sn.getHostname(), rack);
813      return rack;
814    }
815  }
816
817  public static int getDataNodePort(Configuration conf) {
818    HdfsConfiguration.init();
819    Configuration dnConf = new HdfsConfiguration(conf);
820    int dnPort = NetUtils.createSocketAddr(dnConf.get(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY,
821      DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort();
822    LOG.debug("Loaded default datanode port for FN: {}", dnPort);
823    return dnPort;
824  }
825}