001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.favored;
019
020import static org.apache.hadoop.hbase.ServerName.NON_STARTCODE;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Map.Entry;
029import java.util.Set;
030import java.util.concurrent.ThreadLocalRandom;
031import org.apache.commons.lang3.StringUtils;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.CatalogFamilyFormat;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellBuilderFactory;
036import org.apache.hadoop.hbase.CellBuilderType;
037import org.apache.hadoop.hbase.HBaseIOException;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.ConnectionFactory;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.master.RackManager;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.hadoop.hdfs.DFSConfigKeys;
050import org.apache.hadoop.hdfs.HdfsConfiguration;
051import org.apache.hadoop.net.NetUtils;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
057import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
058import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
059
060import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FavoredNodes;
063
064/**
065 * Helper class for FavoredNodeLoadBalancer that has all the intelligence for racks, meta scans,
066 * etc. Instantiated by the FavoredNodeLoadBalancer when needed (from within calls like
067 * FavoredNodeLoadBalancer#randomAssignment(RegionInfo, List). All updates to favored nodes should
068 * only be done from FavoredNodesManager and not through this helper class (except for tests).
069 */
070@InterfaceAudience.Private
071public class FavoredNodeAssignmentHelper {
072  private static final Logger LOG = LoggerFactory.getLogger(FavoredNodeAssignmentHelper.class);
073  private RackManager rackManager;
074  private Map<String, List<ServerName>> rackToRegionServerMap;
075  private List<String> uniqueRackList;
076  // This map serves as a cache for rack to sn lookups. The num of
077  // region server entries might not match with that is in servers.
078  private Map<String, String> regionServerToRackMap;
079  private List<ServerName> servers;
080  public static final byte[] FAVOREDNODES_QUALIFIER = Bytes.toBytes("fn");
081  public final static short FAVORED_NODES_NUM = 3;
082  public final static short MAX_ATTEMPTS_FN_GENERATION = 10;
083
084  public FavoredNodeAssignmentHelper(final List<ServerName> servers, Configuration conf) {
085    this(servers, new RackManager(conf));
086  }
087
088  public FavoredNodeAssignmentHelper(final List<ServerName> servers,
089    final RackManager rackManager) {
090    this.servers = servers;
091    this.rackManager = rackManager;
092    this.rackToRegionServerMap = new HashMap<>();
093    this.regionServerToRackMap = new HashMap<>();
094    this.uniqueRackList = new ArrayList<>();
095  }
096
097  // Always initialize() when FavoredNodeAssignmentHelper is constructed.
098  public void initialize() {
099    for (ServerName sn : this.servers) {
100      String rackName = getRackOfServer(sn);
101      List<ServerName> serverList = this.rackToRegionServerMap.get(rackName);
102      if (serverList == null) {
103        serverList = Lists.newArrayList();
104        // Add the current rack to the unique rack list
105        this.uniqueRackList.add(rackName);
106        this.rackToRegionServerMap.put(rackName, serverList);
107      }
108      for (ServerName serverName : serverList) {
109        if (ServerName.isSameAddress(sn, serverName)) {
110          // The server is already present, ignore.
111          break;
112        }
113      }
114      serverList.add(sn);
115      this.regionServerToRackMap.put(sn.getHostname(), rackName);
116    }
117  }
118
119  /**
120   * Update meta table with favored nodes info
121   * @param regionToFavoredNodes map of RegionInfo's to their favored nodes
122   * @param connection           connection to be used
123   */
124  public static void updateMetaWithFavoredNodesInfo(
125    Map<RegionInfo, List<ServerName>> regionToFavoredNodes, Connection connection)
126    throws IOException {
127    List<Put> puts = new ArrayList<>();
128    for (Map.Entry<RegionInfo, List<ServerName>> entry : regionToFavoredNodes.entrySet()) {
129      Put put = makePut(entry.getKey(), entry.getValue());
130      if (put != null) {
131        puts.add(put);
132      }
133    }
134    try (Table table = connection.getTable(TableName.META_TABLE_NAME)) {
135      table.put(puts);
136    }
137    LOG.info("Added " + puts.size() + " region favored nodes in META");
138  }
139
140  /**
141   * Update meta table with favored nodes info
142   */
143  public static void updateMetaWithFavoredNodesInfo(
144    Map<RegionInfo, List<ServerName>> regionToFavoredNodes, Configuration conf) throws IOException {
145    // Write the region assignments to the meta table.
146    // TODO: See above overrides take a Connection rather than a Configuration only the
147    // Connection is a short circuit connection. That is not going to good in all cases, when
148    // master and meta are not colocated. Fix when this favored nodes feature is actually used
149    // someday.
150    try (Connection conn = ConnectionFactory.createConnection(conf)) {
151      updateMetaWithFavoredNodesInfo(regionToFavoredNodes, conn);
152    }
153  }
154
155  private static Put makePut(RegionInfo regionInfo, List<ServerName> favoredNodeList)
156    throws IOException {
157    if (CollectionUtils.isEmpty(favoredNodeList)) {
158      return null;
159    }
160    long time = EnvironmentEdgeManager.currentTime();
161    Put put = new Put(CatalogFamilyFormat.getMetaKeyForRegion(regionInfo), time);
162    byte[] favoredNodes = getFavoredNodes(favoredNodeList);
163    put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
164      .setFamily(HConstants.CATALOG_FAMILY).setQualifier(FAVOREDNODES_QUALIFIER).setTimestamp(time)
165      .setType(Cell.Type.Put).setValue(favoredNodes).build());
166    LOG.debug("Create the region {} with favored nodes {}", regionInfo.getRegionNameAsString(),
167      favoredNodeList);
168    return put;
169  }
170
171  /**
172   * Convert PB bytes to ServerName.
173   * @param favoredNodes The PB'ed bytes of favored nodes
174   * @return the array of {@link ServerName} for the byte array of favored nodes.
175   */
176  public static ServerName[] getFavoredNodesList(byte[] favoredNodes) throws IOException {
177    FavoredNodes f = FavoredNodes.parseFrom(favoredNodes);
178    List<HBaseProtos.ServerName> protoNodes = f.getFavoredNodeList();
179    ServerName[] servers = new ServerName[protoNodes.size()];
180    int i = 0;
181    for (HBaseProtos.ServerName node : protoNodes) {
182      servers[i++] = ProtobufUtil.toServerName(node);
183    }
184    return servers;
185  }
186
187  /** Returns PB'ed bytes of {@link FavoredNodes} generated by the server list. */
188  public static byte[] getFavoredNodes(List<ServerName> serverAddrList) {
189    FavoredNodes.Builder f = FavoredNodes.newBuilder();
190    for (ServerName s : serverAddrList) {
191      HBaseProtos.ServerName.Builder b = HBaseProtos.ServerName.newBuilder();
192      b.setHostName(s.getHostname());
193      b.setPort(s.getPort());
194      b.setStartCode(ServerName.NON_STARTCODE);
195      f.addFavoredNode(b.build());
196    }
197    return f.build().toByteArray();
198  }
199
200  // Place the regions round-robin across the racks picking one server from each
201  // rack at a time. Start with a random rack, and a random server from every rack.
202  // If a rack doesn't have enough servers it will go to the next rack and so on.
203  // for choosing a primary.
204  // For example, if 4 racks (r1 .. r4) with 8 servers (s1..s8) each, one possible
205  // placement could be r2:s5, r3:s5, r4:s5, r1:s5, r2:s6, r3:s6..
206  // If there were fewer servers in one rack, say r3, which had 3 servers, one possible
207  // placement could be r2:s5, <skip-r3>, r4:s5, r1:s5, r2:s6, <skip-r3> ...
208  // The regions should be distributed proportionately to the racksizes
209  public void placePrimaryRSAsRoundRobin(Map<ServerName, List<RegionInfo>> assignmentMap,
210    Map<RegionInfo, ServerName> primaryRSMap, List<RegionInfo> regions) {
211    List<String> rackList = new ArrayList<>(rackToRegionServerMap.size());
212    rackList.addAll(rackToRegionServerMap.keySet());
213    int rackIndex = ThreadLocalRandom.current().nextInt(rackList.size());
214    int maxRackSize = 0;
215    for (Map.Entry<String, List<ServerName>> r : rackToRegionServerMap.entrySet()) {
216      if (r.getValue().size() > maxRackSize) {
217        maxRackSize = r.getValue().size();
218      }
219    }
220    int numIterations = 0;
221    // Initialize the current processing host index.
222    int serverIndex = ThreadLocalRandom.current().nextInt(maxRackSize);
223    for (RegionInfo regionInfo : regions) {
224      List<ServerName> currentServerList;
225      String rackName;
226      while (true) {
227        rackName = rackList.get(rackIndex);
228        numIterations++;
229        // Get the server list for the current rack
230        currentServerList = rackToRegionServerMap.get(rackName);
231
232        if (serverIndex >= currentServerList.size()) { // not enough machines in this rack
233          if (numIterations % rackList.size() == 0) {
234            if (++serverIndex >= maxRackSize) serverIndex = 0;
235          }
236          if (++rackIndex >= rackList.size()) {
237            rackIndex = 0; // reset the rack index to 0
238          }
239        } else break;
240      }
241
242      // Get the current process region server
243      ServerName currentServer = currentServerList.get(serverIndex);
244
245      // Place the current region with the current primary region server
246      primaryRSMap.put(regionInfo, currentServer);
247      if (assignmentMap != null) {
248        List<RegionInfo> regionsForServer = assignmentMap.get(currentServer);
249        if (regionsForServer == null) {
250          regionsForServer = new ArrayList<>();
251          assignmentMap.put(currentServer, regionsForServer);
252        }
253        regionsForServer.add(regionInfo);
254      }
255
256      // Set the next processing index
257      if (numIterations % rackList.size() == 0) {
258        ++serverIndex;
259      }
260      if (++rackIndex >= rackList.size()) {
261        rackIndex = 0; // reset the rack index to 0
262      }
263    }
264  }
265
266  public Map<RegionInfo, ServerName[]>
267    placeSecondaryAndTertiaryRS(Map<RegionInfo, ServerName> primaryRSMap) {
268    Map<RegionInfo, ServerName[]> secondaryAndTertiaryMap = new HashMap<>();
269    for (Map.Entry<RegionInfo, ServerName> entry : primaryRSMap.entrySet()) {
270      // Get the target region and its primary region server rack
271      RegionInfo regionInfo = entry.getKey();
272      ServerName primaryRS = entry.getValue();
273      try {
274        // Create the secondary and tertiary region server pair object.
275        ServerName[] favoredNodes = getSecondaryAndTertiary(regionInfo, primaryRS);
276        if (favoredNodes != null) {
277          secondaryAndTertiaryMap.put(regionInfo, favoredNodes);
278          LOG.debug("Place the secondary and tertiary region server for region "
279            + regionInfo.getRegionNameAsString());
280        }
281      } catch (Exception e) {
282        LOG.warn("Cannot place the favored nodes for region " + regionInfo.getRegionNameAsString()
283          + " because " + e, e);
284        continue;
285      }
286    }
287    return secondaryAndTertiaryMap;
288  }
289
290  public ServerName[] getSecondaryAndTertiary(RegionInfo regionInfo, ServerName primaryRS)
291    throws IOException {
292
293    ServerName[] favoredNodes;// Get the rack for the primary region server
294    String primaryRack = getRackOfServer(primaryRS);
295
296    if (getTotalNumberOfRacks() == 1) {
297      favoredNodes = singleRackCase(regionInfo, primaryRS, primaryRack);
298    } else {
299      favoredNodes = multiRackCase(primaryRS, primaryRack);
300    }
301    return favoredNodes;
302  }
303
304  private Map<ServerName, Set<RegionInfo>>
305    mapRSToPrimaries(Map<RegionInfo, ServerName> primaryRSMap) {
306    Map<ServerName, Set<RegionInfo>> primaryServerMap = new HashMap<>();
307    for (Entry<RegionInfo, ServerName> e : primaryRSMap.entrySet()) {
308      Set<RegionInfo> currentSet = primaryServerMap.get(e.getValue());
309      if (currentSet == null) {
310        currentSet = new HashSet<>();
311      }
312      currentSet.add(e.getKey());
313      primaryServerMap.put(e.getValue(), currentSet);
314    }
315    return primaryServerMap;
316  }
317
318  /**
319   * For regions that share the primary, avoid placing the secondary and tertiary on a same RS. Used
320   * for generating new assignments for the primary/secondary/tertiary RegionServers
321   * @return the map of regions to the servers the region-files should be hosted on
322   */
323  public Map<RegionInfo, ServerName[]>
324    placeSecondaryAndTertiaryWithRestrictions(Map<RegionInfo, ServerName> primaryRSMap) {
325    Map<ServerName, Set<RegionInfo>> serverToPrimaries = mapRSToPrimaries(primaryRSMap);
326    Map<RegionInfo, ServerName[]> secondaryAndTertiaryMap = new HashMap<>();
327
328    for (Entry<RegionInfo, ServerName> entry : primaryRSMap.entrySet()) {
329      // Get the target region and its primary region server rack
330      RegionInfo regionInfo = entry.getKey();
331      ServerName primaryRS = entry.getValue();
332      try {
333        // Get the rack for the primary region server
334        String primaryRack = getRackOfServer(primaryRS);
335        ServerName[] favoredNodes = null;
336        if (getTotalNumberOfRacks() == 1) {
337          // Single rack case: have to pick the secondary and tertiary
338          // from the same rack
339          favoredNodes = singleRackCase(regionInfo, primaryRS, primaryRack);
340        } else {
341          favoredNodes = multiRackCaseWithRestrictions(serverToPrimaries, secondaryAndTertiaryMap,
342            primaryRack, primaryRS, regionInfo);
343        }
344        if (favoredNodes != null) {
345          secondaryAndTertiaryMap.put(regionInfo, favoredNodes);
346          LOG.debug("Place the secondary and tertiary region server for region "
347            + regionInfo.getRegionNameAsString());
348        }
349      } catch (Exception e) {
350        LOG.warn("Cannot place the favored nodes for region " + regionInfo.getRegionNameAsString()
351          + " because " + e, e);
352        continue;
353      }
354    }
355    return secondaryAndTertiaryMap;
356  }
357
358  private ServerName[] multiRackCaseWithRestrictions(
359    Map<ServerName, Set<RegionInfo>> serverToPrimaries,
360    Map<RegionInfo, ServerName[]> secondaryAndTertiaryMap, String primaryRack, ServerName primaryRS,
361    RegionInfo regionInfo) throws IOException {
362    // Random to choose the secondary and tertiary region server
363    // from another rack to place the secondary and tertiary
364    // Random to choose one rack except for the current rack
365    Set<String> rackSkipSet = new HashSet<>();
366    rackSkipSet.add(primaryRack);
367    String secondaryRack = getOneRandomRack(rackSkipSet);
368    List<ServerName> serverList = getServersFromRack(secondaryRack);
369    Set<ServerName> serverSet = new HashSet<>(serverList);
370    ServerName[] favoredNodes;
371    if (serverList.size() >= 2) {
372      // Randomly pick up two servers from this secondary rack
373      // Skip the secondary for the tertiary placement
374      // skip the servers which share the primary already
375      Set<RegionInfo> primaries = serverToPrimaries.get(primaryRS);
376      Set<ServerName> skipServerSet = new HashSet<>();
377      while (true) {
378        ServerName[] secondaryAndTertiary = null;
379        if (primaries.size() > 1) {
380          // check where his tertiary and secondary are
381          for (RegionInfo primary : primaries) {
382            secondaryAndTertiary = secondaryAndTertiaryMap.get(primary);
383            if (secondaryAndTertiary != null) {
384              if (getRackOfServer(secondaryAndTertiary[0]).equals(secondaryRack)) {
385                skipServerSet.add(secondaryAndTertiary[0]);
386              }
387              if (getRackOfServer(secondaryAndTertiary[1]).equals(secondaryRack)) {
388                skipServerSet.add(secondaryAndTertiary[1]);
389              }
390            }
391          }
392        }
393        if (skipServerSet.size() + 2 <= serverSet.size()) break;
394        skipServerSet.clear();
395        rackSkipSet.add(secondaryRack);
396        // we used all racks
397        if (rackSkipSet.size() == getTotalNumberOfRacks()) {
398          // remove the last two added and break
399          skipServerSet.remove(secondaryAndTertiary[0]);
400          skipServerSet.remove(secondaryAndTertiary[1]);
401          break;
402        }
403        secondaryRack = getOneRandomRack(rackSkipSet);
404        serverList = getServersFromRack(secondaryRack);
405        serverSet = new HashSet<>(serverList);
406      }
407
408      // Place the secondary RS
409      ServerName secondaryRS = getOneRandomServer(secondaryRack, skipServerSet);
410      skipServerSet.add(secondaryRS);
411      // Place the tertiary RS
412      ServerName tertiaryRS = getOneRandomServer(secondaryRack, skipServerSet);
413
414      if (secondaryRS == null || tertiaryRS == null) {
415        LOG.error("Cannot place the secondary and tertiary" + " region server for region "
416          + regionInfo.getRegionNameAsString());
417      }
418      // Create the secondary and tertiary pair
419      favoredNodes = new ServerName[2];
420      favoredNodes[0] = secondaryRS;
421      favoredNodes[1] = tertiaryRS;
422    } else {
423      // Pick the secondary rs from this secondary rack
424      // and pick the tertiary from another random rack
425      favoredNodes = new ServerName[2];
426      ServerName secondary = getOneRandomServer(secondaryRack);
427      favoredNodes[0] = secondary;
428
429      // Pick the tertiary
430      if (getTotalNumberOfRacks() == 2) {
431        // Pick the tertiary from the same rack of the primary RS
432        Set<ServerName> serverSkipSet = new HashSet<>();
433        serverSkipSet.add(primaryRS);
434        favoredNodes[1] = getOneRandomServer(primaryRack, serverSkipSet);
435      } else {
436        // Pick the tertiary from another rack
437        rackSkipSet.add(secondaryRack);
438        String tertiaryRandomRack = getOneRandomRack(rackSkipSet);
439        favoredNodes[1] = getOneRandomServer(tertiaryRandomRack);
440      }
441    }
442    return favoredNodes;
443  }
444
445  private ServerName[] singleRackCase(RegionInfo regionInfo, ServerName primaryRS,
446    String primaryRack) throws IOException {
447    // Single rack case: have to pick the secondary and tertiary
448    // from the same rack
449    List<ServerName> serverList = getServersFromRack(primaryRack);
450    if ((serverList == null) || (serverList.size() <= 2)) {
451      // Single region server case: cannot not place the favored nodes
452      // on any server;
453      return null;
454    } else {
455      // Randomly select two region servers from the server list and make sure
456      // they are not overlap with the primary region server;
457      Set<ServerName> serverSkipSet = new HashSet<>();
458      serverSkipSet.add(primaryRS);
459
460      // Place the secondary RS
461      ServerName secondaryRS = getOneRandomServer(primaryRack, serverSkipSet);
462      // Skip the secondary for the tertiary placement
463      serverSkipSet.add(secondaryRS);
464      ServerName tertiaryRS = getOneRandomServer(primaryRack, serverSkipSet);
465
466      if (secondaryRS == null || tertiaryRS == null) {
467        LOG.error("Cannot place the secondary, tertiary favored node for region "
468          + regionInfo.getRegionNameAsString());
469      }
470      // Create the secondary and tertiary pair
471      ServerName[] favoredNodes = new ServerName[2];
472      favoredNodes[0] = secondaryRS;
473      favoredNodes[1] = tertiaryRS;
474      return favoredNodes;
475    }
476  }
477
478  /**
479   * Place secondary and tertiary nodes in a multi rack case. If there are only two racks, then we
480   * try the place the secondary and tertiary on different rack than primary. But if the other rack
481   * has only one region server, then we place primary and tertiary on one rack and secondary on
482   * another. The aim is two distribute the three favored nodes on >= 2 racks. TODO: see how we can
483   * use generateMissingFavoredNodeMultiRack API here
484   * @param primaryRS   The primary favored node.
485   * @param primaryRack The rack of the primary favored node.
486   * @return Array containing secondary and tertiary favored nodes.
487   * @throws IOException Signals that an I/O exception has occurred.
488   */
489  private ServerName[] multiRackCase(ServerName primaryRS, String primaryRack) throws IOException {
490
491    List<ServerName> favoredNodes = Lists.newArrayList(primaryRS);
492    // Create the secondary and tertiary pair
493    ServerName secondaryRS = generateMissingFavoredNodeMultiRack(favoredNodes);
494    favoredNodes.add(secondaryRS);
495    String secondaryRack = getRackOfServer(secondaryRS);
496
497    ServerName tertiaryRS;
498    if (primaryRack.equals(secondaryRack)) {
499      tertiaryRS = generateMissingFavoredNode(favoredNodes);
500    } else {
501      // Try to place tertiary in secondary RS rack else place on primary rack.
502      tertiaryRS = getOneRandomServer(secondaryRack, Sets.newHashSet(secondaryRS));
503      if (tertiaryRS == null) {
504        tertiaryRS = getOneRandomServer(primaryRack, Sets.newHashSet(primaryRS));
505      }
506      // We couldn't find anything in secondary rack, get any FN
507      if (tertiaryRS == null) {
508        tertiaryRS = generateMissingFavoredNode(Lists.newArrayList(primaryRS, secondaryRS));
509      }
510    }
511    return new ServerName[] { secondaryRS, tertiaryRS };
512  }
513
514  public boolean canPlaceFavoredNodes() {
515    return (this.servers.size() >= FAVORED_NODES_NUM);
516  }
517
518  private int getTotalNumberOfRacks() {
519    return this.uniqueRackList.size();
520  }
521
522  private List<ServerName> getServersFromRack(String rack) {
523    return this.rackToRegionServerMap.get(rack);
524  }
525
526  /**
527   * Gets a random server from the specified rack and skips anything specified.
528   * @param rack          rack from a server is needed
529   * @param skipServerSet the server shouldn't belong to this set
530   */
531  protected ServerName getOneRandomServer(String rack, Set<ServerName> skipServerSet) {
532
533    // Is the rack valid? Do we recognize it?
534    if (rack == null || getServersFromRack(rack) == null || getServersFromRack(rack).isEmpty()) {
535      return null;
536    }
537
538    // Lets use a set so we can eliminate duplicates
539    Set<StartcodeAgnosticServerName> serversToChooseFrom = Sets.newHashSet();
540    for (ServerName sn : getServersFromRack(rack)) {
541      serversToChooseFrom.add(StartcodeAgnosticServerName.valueOf(sn));
542    }
543
544    if (skipServerSet != null && skipServerSet.size() > 0) {
545      for (ServerName sn : skipServerSet) {
546        serversToChooseFrom.remove(StartcodeAgnosticServerName.valueOf(sn));
547      }
548      // Do we have any servers left to choose from?
549      if (serversToChooseFrom.isEmpty()) {
550        return null;
551      }
552    }
553
554    ServerName randomServer = null;
555    int randomIndex = ThreadLocalRandom.current().nextInt(serversToChooseFrom.size());
556    int j = 0;
557    for (StartcodeAgnosticServerName sn : serversToChooseFrom) {
558      if (j == randomIndex) {
559        randomServer = sn;
560        break;
561      }
562      j++;
563    }
564
565    if (randomServer != null) {
566      return ServerName.valueOf(randomServer.getAddress(), randomServer.getStartcode());
567    } else {
568      return null;
569    }
570  }
571
572  private ServerName getOneRandomServer(String rack) throws IOException {
573    return this.getOneRandomServer(rack, null);
574  }
575
576  String getOneRandomRack(Set<String> skipRackSet) throws IOException {
577    if (skipRackSet == null || uniqueRackList.size() <= skipRackSet.size()) {
578      throw new IOException("Cannot randomly pick another random server");
579    }
580
581    String randomRack;
582    do {
583      int randomIndex = ThreadLocalRandom.current().nextInt(this.uniqueRackList.size());
584      randomRack = this.uniqueRackList.get(randomIndex);
585    } while (skipRackSet.contains(randomRack));
586
587    return randomRack;
588  }
589
590  public static String getFavoredNodesAsString(List<ServerName> nodes) {
591    StringBuilder strBuf = new StringBuilder();
592    int i = 0;
593    for (ServerName node : nodes) {
594      strBuf.append(node.getAddress());
595      if (++i != nodes.size()) strBuf.append(";");
596    }
597    return strBuf.toString();
598  }
599
600  /*
601   * Generates a missing favored node based on the input favored nodes. This helps to generate new
602   * FN when there is already 2 FN and we need a third one. For eg, while generating new FN for
603   * split daughters after inheriting 2 FN from the parent. If the cluster has only one rack it
604   * generates from the same rack. If the cluster has multiple racks, then it ensures the new FN
605   * respects the rack constraints similar to HDFS. For eg: if there are 3 FN, they will be spread
606   * across 2 racks.
607   */
608  public ServerName generateMissingFavoredNode(List<ServerName> favoredNodes) throws IOException {
609    if (this.uniqueRackList.size() == 1) {
610      return generateMissingFavoredNodeSingleRack(favoredNodes, null);
611    } else {
612      return generateMissingFavoredNodeMultiRack(favoredNodes, null);
613    }
614  }
615
616  public ServerName generateMissingFavoredNode(List<ServerName> favoredNodes,
617    List<ServerName> excludeNodes) throws IOException {
618    if (this.uniqueRackList.size() == 1) {
619      return generateMissingFavoredNodeSingleRack(favoredNodes, excludeNodes);
620    } else {
621      return generateMissingFavoredNodeMultiRack(favoredNodes, excludeNodes);
622    }
623  }
624
625  /*
626   * Generate FN for a single rack scenario, don't generate from one of the excluded nodes. Helps
627   * when we would like to find a replacement node.
628   */
629  private ServerName generateMissingFavoredNodeSingleRack(List<ServerName> favoredNodes,
630    List<ServerName> excludeNodes) throws IOException {
631    ServerName newServer = null;
632    Set<ServerName> excludeFNSet = Sets.newHashSet(favoredNodes);
633    if (excludeNodes != null && excludeNodes.size() > 0) {
634      excludeFNSet.addAll(excludeNodes);
635    }
636    if (favoredNodes.size() < FAVORED_NODES_NUM) {
637      newServer = this.getOneRandomServer(this.uniqueRackList.get(0), excludeFNSet);
638    }
639    return newServer;
640  }
641
642  private ServerName generateMissingFavoredNodeMultiRack(List<ServerName> favoredNodes)
643    throws IOException {
644    return generateMissingFavoredNodeMultiRack(favoredNodes, null);
645  }
646
647  /*
648   * Generates a missing FN based on the input favoredNodes and also the nodes to be skipped. Get
649   * the current layout of favored nodes arrangement and nodes to be excluded and get a random node
650   * that goes with HDFS block placement. Eg: If the existing nodes are on one rack, generate one
651   * from another rack. We exclude as much as possible so the random selection has more chance to
652   * generate a node within a few iterations, ideally 1.
653   */
654  private ServerName generateMissingFavoredNodeMultiRack(List<ServerName> favoredNodes,
655    List<ServerName> excludeNodes) throws IOException {
656
657    Set<String> racks = Sets.newHashSet();
658    Map<String, Set<ServerName>> rackToFNMapping = new HashMap<>();
659
660    // Lets understand the current rack distribution of the FN
661    for (ServerName sn : favoredNodes) {
662      String rack = getRackOfServer(sn);
663      racks.add(rack);
664
665      Set<ServerName> serversInRack = rackToFNMapping.get(rack);
666      if (serversInRack == null) {
667        serversInRack = Sets.newHashSet();
668        rackToFNMapping.put(rack, serversInRack);
669      }
670      serversInRack.add(sn);
671    }
672
673    // What racks should be skipped while getting a FN?
674    Set<String> skipRackSet = Sets.newHashSet();
675
676    /*
677     * If both the FN are from the same rack, then we don't want to generate another FN on the same
678     * rack. If that rack fails, the region would be unavailable.
679     */
680    if (racks.size() == 1 && favoredNodes.size() > 1) {
681      skipRackSet.add(racks.iterator().next());
682    }
683
684    /*
685     * If there are no free nodes on the existing racks, we should skip those racks too. We can
686     * reduce the number of iterations for FN selection.
687     */
688    for (String rack : racks) {
689      if (
690        getServersFromRack(rack) != null
691          && rackToFNMapping.get(rack).size() == getServersFromRack(rack).size()
692      ) {
693        skipRackSet.add(rack);
694      }
695    }
696
697    Set<ServerName> favoredNodeSet = Sets.newHashSet(favoredNodes);
698    if (excludeNodes != null && excludeNodes.size() > 0) {
699      favoredNodeSet.addAll(excludeNodes);
700    }
701
702    /*
703     * Lets get a random rack by excluding skipRackSet and generate a random FN from that rack.
704     */
705    int i = 0;
706    Set<String> randomRacks = Sets.newHashSet();
707    ServerName newServer = null;
708    do {
709      String randomRack = this.getOneRandomRack(skipRackSet);
710      newServer = this.getOneRandomServer(randomRack, favoredNodeSet);
711      randomRacks.add(randomRack);
712      i++;
713    } while ((i < MAX_ATTEMPTS_FN_GENERATION) && (newServer == null));
714
715    if (newServer == null) {
716      if (LOG.isTraceEnabled()) {
717        LOG.trace(String.format(
718          "Unable to generate additional favored nodes for %s after "
719            + "considering racks %s and skip rack %s with a unique rack list of %s and rack "
720            + "to RS map of %s and RS to rack map of %s",
721          StringUtils.join(favoredNodes, ","), randomRacks, skipRackSet, uniqueRackList,
722          rackToRegionServerMap, regionServerToRackMap));
723      }
724      throw new IOException(
725        " Unable to generate additional favored nodes for " + StringUtils.join(favoredNodes, ","));
726    }
727    return newServer;
728  }
729
730  /*
731   * Generate favored nodes for a region. Choose a random server as primary and then choose
732   * secondary and tertiary FN so its spread across two racks.
733   */
734  public List<ServerName> generateFavoredNodes(RegionInfo hri) throws IOException {
735
736    List<ServerName> favoredNodesForRegion = new ArrayList<>(FAVORED_NODES_NUM);
737    ServerName primary = servers.get(ThreadLocalRandom.current().nextInt(servers.size()));
738    favoredNodesForRegion.add(ServerName.valueOf(primary.getAddress(), ServerName.NON_STARTCODE));
739
740    Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>(1);
741    primaryRSMap.put(hri, primary);
742    Map<RegionInfo, ServerName[]> secondaryAndTertiaryRSMap =
743      placeSecondaryAndTertiaryRS(primaryRSMap);
744    ServerName[] secondaryAndTertiaryNodes = secondaryAndTertiaryRSMap.get(hri);
745    if (secondaryAndTertiaryNodes != null && secondaryAndTertiaryNodes.length == 2) {
746      for (ServerName sn : secondaryAndTertiaryNodes) {
747        favoredNodesForRegion.add(ServerName.valueOf(sn.getAddress(), ServerName.NON_STARTCODE));
748      }
749      return favoredNodesForRegion;
750    } else {
751      throw new HBaseIOException("Unable to generate secondary and tertiary favored nodes.");
752    }
753  }
754
755  public Map<RegionInfo, List<ServerName>> generateFavoredNodesRoundRobin(
756    Map<ServerName, List<RegionInfo>> assignmentMap, List<RegionInfo> regions) throws IOException {
757
758    if (regions.size() > 0) {
759      if (canPlaceFavoredNodes()) {
760        Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>();
761        // Lets try to have an equal distribution for primary favored node
762        placePrimaryRSAsRoundRobin(assignmentMap, primaryRSMap, regions);
763        return generateFavoredNodes(primaryRSMap);
764
765      } else {
766        throw new HBaseIOException("Not enough nodes to generate favored nodes");
767      }
768    }
769    return null;
770  }
771
772  /*
773   * Generate favored nodes for a set of regions when we know where they are currently hosted.
774   */
775  private Map<RegionInfo, List<ServerName>>
776    generateFavoredNodes(Map<RegionInfo, ServerName> primaryRSMap) {
777
778    Map<RegionInfo, List<ServerName>> generatedFavNodes = new HashMap<>();
779    Map<RegionInfo, ServerName[]> secondaryAndTertiaryRSMap =
780      placeSecondaryAndTertiaryRS(primaryRSMap);
781
782    for (Entry<RegionInfo, ServerName> entry : primaryRSMap.entrySet()) {
783      List<ServerName> favoredNodesForRegion = new ArrayList<>(FAVORED_NODES_NUM);
784      RegionInfo region = entry.getKey();
785      ServerName primarySN = entry.getValue();
786      favoredNodesForRegion
787        .add(ServerName.valueOf(primarySN.getHostname(), primarySN.getPort(), NON_STARTCODE));
788      ServerName[] secondaryAndTertiaryNodes = secondaryAndTertiaryRSMap.get(region);
789      if (secondaryAndTertiaryNodes != null) {
790        favoredNodesForRegion.add(ServerName.valueOf(secondaryAndTertiaryNodes[0].getHostname(),
791          secondaryAndTertiaryNodes[0].getPort(), NON_STARTCODE));
792        favoredNodesForRegion.add(ServerName.valueOf(secondaryAndTertiaryNodes[1].getHostname(),
793          secondaryAndTertiaryNodes[1].getPort(), NON_STARTCODE));
794      }
795      generatedFavNodes.put(region, favoredNodesForRegion);
796    }
797    return generatedFavNodes;
798  }
799
800  /**
801   * Get the rack of server from local mapping when present, saves lookup by the RackManager.
802   */
803  private String getRackOfServer(ServerName sn) {
804    if (this.regionServerToRackMap.containsKey(sn.getHostname())) {
805      return this.regionServerToRackMap.get(sn.getHostname());
806    } else {
807      String rack = this.rackManager.getRack(sn);
808      this.regionServerToRackMap.put(sn.getHostname(), rack);
809      return rack;
810    }
811  }
812
813  public static int getDataNodePort(Configuration conf) {
814    HdfsConfiguration.init();
815    Configuration dnConf = new HdfsConfiguration(conf);
816    int dnPort = NetUtils.createSocketAddr(dnConf.get(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY,
817      DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort();
818    LOG.debug("Loaded default datanode port for FN: {}", dnPort);
819    return dnPort;
820  }
821}