001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.favored; 019 020import static org.apache.hadoop.hbase.ServerName.NON_STARTCODE; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Map.Entry; 029import java.util.Set; 030import java.util.concurrent.ThreadLocalRandom; 031import org.apache.commons.lang3.StringUtils; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.CatalogFamilyFormat; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CellBuilderFactory; 036import org.apache.hadoop.hbase.CellBuilderType; 037import org.apache.hadoop.hbase.HBaseIOException; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.ServerName; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.client.Connection; 042import org.apache.hadoop.hbase.client.ConnectionFactory; 043import org.apache.hadoop.hbase.client.Put; 044import org.apache.hadoop.hbase.client.RegionInfo; 045import org.apache.hadoop.hbase.client.Table; 046import org.apache.hadoop.hbase.master.RackManager; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 049import org.apache.hadoop.hdfs.DFSConfigKeys; 050import org.apache.hadoop.hdfs.HdfsConfiguration; 051import org.apache.hadoop.net.NetUtils; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 057import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 058import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 059 060import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FavoredNodes; 063 064/** 065 * Helper class for FavoredNodeLoadBalancer that has all the intelligence for racks, meta scans, 066 * etc. Instantiated by the FavoredNodeLoadBalancer when needed (from within calls like 067 * FavoredNodeLoadBalancer#randomAssignment(RegionInfo, List). All updates to favored nodes should 068 * only be done from FavoredNodesManager and not through this helper class (except for tests). 069 */ 070@InterfaceAudience.Private 071public class FavoredNodeAssignmentHelper { 072 private static final Logger LOG = LoggerFactory.getLogger(FavoredNodeAssignmentHelper.class); 073 private RackManager rackManager; 074 private Map<String, List<ServerName>> rackToRegionServerMap; 075 private List<String> uniqueRackList; 076 // This map serves as a cache for rack to sn lookups. The num of 077 // region server entries might not match with that is in servers. 078 private Map<String, String> regionServerToRackMap; 079 private List<ServerName> servers; 080 public static final byte[] FAVOREDNODES_QUALIFIER = Bytes.toBytes("fn"); 081 public final static short FAVORED_NODES_NUM = 3; 082 public final static short MAX_ATTEMPTS_FN_GENERATION = 10; 083 084 public FavoredNodeAssignmentHelper(final List<ServerName> servers, Configuration conf) { 085 this(servers, new RackManager(conf)); 086 } 087 088 public FavoredNodeAssignmentHelper(final List<ServerName> servers, 089 final RackManager rackManager) { 090 this.servers = servers; 091 this.rackManager = rackManager; 092 this.rackToRegionServerMap = new HashMap<>(); 093 this.regionServerToRackMap = new HashMap<>(); 094 this.uniqueRackList = new ArrayList<>(); 095 } 096 097 // Always initialize() when FavoredNodeAssignmentHelper is constructed. 098 public void initialize() { 099 for (ServerName sn : this.servers) { 100 String rackName = getRackOfServer(sn); 101 List<ServerName> serverList = this.rackToRegionServerMap.get(rackName); 102 if (serverList == null) { 103 serverList = Lists.newArrayList(); 104 // Add the current rack to the unique rack list 105 this.uniqueRackList.add(rackName); 106 this.rackToRegionServerMap.put(rackName, serverList); 107 } 108 for (ServerName serverName : serverList) { 109 if (ServerName.isSameAddress(sn, serverName)) { 110 // The server is already present, ignore. 111 break; 112 } 113 } 114 serverList.add((sn)); 115 this.regionServerToRackMap.put(sn.getHostname(), rackName); 116 } 117 } 118 119 /** 120 * Update meta table with favored nodes info 121 * @param regionToFavoredNodes map of RegionInfo's to their favored nodes 122 * @param connection connection to be used 123 */ 124 public static void updateMetaWithFavoredNodesInfo( 125 Map<RegionInfo, List<ServerName>> regionToFavoredNodes, Connection connection) 126 throws IOException { 127 List<Put> puts = new ArrayList<>(); 128 for (Map.Entry<RegionInfo, List<ServerName>> entry : regionToFavoredNodes.entrySet()) { 129 Put put = makePut(entry.getKey(), entry.getValue()); 130 if (put != null) { 131 puts.add(put); 132 } 133 } 134 try (Table table = connection.getTable(TableName.META_TABLE_NAME)) { 135 table.put(puts); 136 } 137 LOG.info("Added " + puts.size() + " region favored nodes in META"); 138 } 139 140 /** 141 * Update meta table with favored nodes info 142 */ 143 public static void updateMetaWithFavoredNodesInfo( 144 Map<RegionInfo, List<ServerName>> regionToFavoredNodes, Configuration conf) throws IOException { 145 // Write the region assignments to the meta table. 146 // TODO: See above overrides take a Connection rather than a Configuration only the 147 // Connection is a short circuit connection. That is not going to good in all cases, when 148 // master and meta are not colocated. Fix when this favored nodes feature is actually used 149 // someday. 150 try (Connection conn = ConnectionFactory.createConnection(conf)) { 151 updateMetaWithFavoredNodesInfo(regionToFavoredNodes, conn); 152 } 153 } 154 155 private static Put makePut(RegionInfo regionInfo, List<ServerName> favoredNodeList) 156 throws IOException { 157 if (CollectionUtils.isEmpty(favoredNodeList)) { 158 return null; 159 } 160 long time = EnvironmentEdgeManager.currentTime(); 161 Put put = new Put(CatalogFamilyFormat.getMetaKeyForRegion(regionInfo), time); 162 byte[] favoredNodes = getFavoredNodes(favoredNodeList); 163 put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow()) 164 .setFamily(HConstants.CATALOG_FAMILY).setQualifier(FAVOREDNODES_QUALIFIER).setTimestamp(time) 165 .setType(Cell.Type.Put).setValue(favoredNodes).build()); 166 LOG.debug("Create the region {} with favored nodes {}", regionInfo.getRegionNameAsString(), 167 favoredNodeList); 168 return put; 169 } 170 171 /** 172 * Convert PB bytes to ServerName. 173 * @param favoredNodes The PB'ed bytes of favored nodes 174 * @return the array of {@link ServerName} for the byte array of favored nodes. 175 */ 176 public static ServerName[] getFavoredNodesList(byte[] favoredNodes) throws IOException { 177 FavoredNodes f = FavoredNodes.parseFrom(favoredNodes); 178 List<HBaseProtos.ServerName> protoNodes = f.getFavoredNodeList(); 179 ServerName[] servers = new ServerName[protoNodes.size()]; 180 int i = 0; 181 for (HBaseProtos.ServerName node : protoNodes) { 182 servers[i++] = ProtobufUtil.toServerName(node); 183 } 184 return servers; 185 } 186 187 /** 188 * n * @return PB'ed bytes of {@link FavoredNodes} generated by the server list. 189 */ 190 public static byte[] getFavoredNodes(List<ServerName> serverAddrList) { 191 FavoredNodes.Builder f = FavoredNodes.newBuilder(); 192 for (ServerName s : serverAddrList) { 193 HBaseProtos.ServerName.Builder b = HBaseProtos.ServerName.newBuilder(); 194 b.setHostName(s.getHostname()); 195 b.setPort(s.getPort()); 196 b.setStartCode(ServerName.NON_STARTCODE); 197 f.addFavoredNode(b.build()); 198 } 199 return f.build().toByteArray(); 200 } 201 202 // Place the regions round-robin across the racks picking one server from each 203 // rack at a time. Start with a random rack, and a random server from every rack. 204 // If a rack doesn't have enough servers it will go to the next rack and so on. 205 // for choosing a primary. 206 // For example, if 4 racks (r1 .. r4) with 8 servers (s1..s8) each, one possible 207 // placement could be r2:s5, r3:s5, r4:s5, r1:s5, r2:s6, r3:s6.. 208 // If there were fewer servers in one rack, say r3, which had 3 servers, one possible 209 // placement could be r2:s5, <skip-r3>, r4:s5, r1:s5, r2:s6, <skip-r3> ... 210 // The regions should be distributed proportionately to the racksizes 211 public void placePrimaryRSAsRoundRobin(Map<ServerName, List<RegionInfo>> assignmentMap, 212 Map<RegionInfo, ServerName> primaryRSMap, List<RegionInfo> regions) { 213 List<String> rackList = new ArrayList<>(rackToRegionServerMap.size()); 214 rackList.addAll(rackToRegionServerMap.keySet()); 215 int rackIndex = ThreadLocalRandom.current().nextInt(rackList.size()); 216 int maxRackSize = 0; 217 for (Map.Entry<String, List<ServerName>> r : rackToRegionServerMap.entrySet()) { 218 if (r.getValue().size() > maxRackSize) { 219 maxRackSize = r.getValue().size(); 220 } 221 } 222 int numIterations = 0; 223 // Initialize the current processing host index. 224 int serverIndex = ThreadLocalRandom.current().nextInt(maxRackSize); 225 for (RegionInfo regionInfo : regions) { 226 List<ServerName> currentServerList; 227 String rackName; 228 while (true) { 229 rackName = rackList.get(rackIndex); 230 numIterations++; 231 // Get the server list for the current rack 232 currentServerList = rackToRegionServerMap.get(rackName); 233 234 if (serverIndex >= currentServerList.size()) { // not enough machines in this rack 235 if (numIterations % rackList.size() == 0) { 236 if (++serverIndex >= maxRackSize) serverIndex = 0; 237 } 238 if ((++rackIndex) >= rackList.size()) { 239 rackIndex = 0; // reset the rack index to 0 240 } 241 } else break; 242 } 243 244 // Get the current process region server 245 ServerName currentServer = currentServerList.get(serverIndex); 246 247 // Place the current region with the current primary region server 248 primaryRSMap.put(regionInfo, currentServer); 249 if (assignmentMap != null) { 250 List<RegionInfo> regionsForServer = assignmentMap.get(currentServer); 251 if (regionsForServer == null) { 252 regionsForServer = new ArrayList<>(); 253 assignmentMap.put(currentServer, regionsForServer); 254 } 255 regionsForServer.add(regionInfo); 256 } 257 258 // Set the next processing index 259 if (numIterations % rackList.size() == 0) { 260 ++serverIndex; 261 } 262 if ((++rackIndex) >= rackList.size()) { 263 rackIndex = 0; // reset the rack index to 0 264 } 265 } 266 } 267 268 public Map<RegionInfo, ServerName[]> 269 placeSecondaryAndTertiaryRS(Map<RegionInfo, ServerName> primaryRSMap) { 270 Map<RegionInfo, ServerName[]> secondaryAndTertiaryMap = new HashMap<>(); 271 for (Map.Entry<RegionInfo, ServerName> entry : primaryRSMap.entrySet()) { 272 // Get the target region and its primary region server rack 273 RegionInfo regionInfo = entry.getKey(); 274 ServerName primaryRS = entry.getValue(); 275 try { 276 // Create the secondary and tertiary region server pair object. 277 ServerName[] favoredNodes = getSecondaryAndTertiary(regionInfo, primaryRS); 278 if (favoredNodes != null) { 279 secondaryAndTertiaryMap.put(regionInfo, favoredNodes); 280 LOG.debug("Place the secondary and tertiary region server for region " 281 + regionInfo.getRegionNameAsString()); 282 } 283 } catch (Exception e) { 284 LOG.warn("Cannot place the favored nodes for region " + regionInfo.getRegionNameAsString() 285 + " because " + e, e); 286 continue; 287 } 288 } 289 return secondaryAndTertiaryMap; 290 } 291 292 public ServerName[] getSecondaryAndTertiary(RegionInfo regionInfo, ServerName primaryRS) 293 throws IOException { 294 295 ServerName[] favoredNodes;// Get the rack for the primary region server 296 String primaryRack = getRackOfServer(primaryRS); 297 298 if (getTotalNumberOfRacks() == 1) { 299 favoredNodes = singleRackCase(regionInfo, primaryRS, primaryRack); 300 } else { 301 favoredNodes = multiRackCase(regionInfo, primaryRS, primaryRack); 302 } 303 return favoredNodes; 304 } 305 306 private Map<ServerName, Set<RegionInfo>> 307 mapRSToPrimaries(Map<RegionInfo, ServerName> primaryRSMap) { 308 Map<ServerName, Set<RegionInfo>> primaryServerMap = new HashMap<>(); 309 for (Entry<RegionInfo, ServerName> e : primaryRSMap.entrySet()) { 310 Set<RegionInfo> currentSet = primaryServerMap.get(e.getValue()); 311 if (currentSet == null) { 312 currentSet = new HashSet<>(); 313 } 314 currentSet.add(e.getKey()); 315 primaryServerMap.put(e.getValue(), currentSet); 316 } 317 return primaryServerMap; 318 } 319 320 /** 321 * For regions that share the primary, avoid placing the secondary and tertiary on a same RS. Used 322 * for generating new assignments for the primary/secondary/tertiary RegionServers n * @return the 323 * map of regions to the servers the region-files should be hosted on 324 */ 325 public Map<RegionInfo, ServerName[]> 326 placeSecondaryAndTertiaryWithRestrictions(Map<RegionInfo, ServerName> primaryRSMap) { 327 Map<ServerName, Set<RegionInfo>> serverToPrimaries = mapRSToPrimaries(primaryRSMap); 328 Map<RegionInfo, ServerName[]> secondaryAndTertiaryMap = new HashMap<>(); 329 330 for (Entry<RegionInfo, ServerName> entry : primaryRSMap.entrySet()) { 331 // Get the target region and its primary region server rack 332 RegionInfo regionInfo = entry.getKey(); 333 ServerName primaryRS = entry.getValue(); 334 try { 335 // Get the rack for the primary region server 336 String primaryRack = getRackOfServer(primaryRS); 337 ServerName[] favoredNodes = null; 338 if (getTotalNumberOfRacks() == 1) { 339 // Single rack case: have to pick the secondary and tertiary 340 // from the same rack 341 favoredNodes = singleRackCase(regionInfo, primaryRS, primaryRack); 342 } else { 343 favoredNodes = multiRackCaseWithRestrictions(serverToPrimaries, secondaryAndTertiaryMap, 344 primaryRack, primaryRS, regionInfo); 345 } 346 if (favoredNodes != null) { 347 secondaryAndTertiaryMap.put(regionInfo, favoredNodes); 348 LOG.debug("Place the secondary and tertiary region server for region " 349 + regionInfo.getRegionNameAsString()); 350 } 351 } catch (Exception e) { 352 LOG.warn("Cannot place the favored nodes for region " + regionInfo.getRegionNameAsString() 353 + " because " + e, e); 354 continue; 355 } 356 } 357 return secondaryAndTertiaryMap; 358 } 359 360 private ServerName[] multiRackCaseWithRestrictions( 361 Map<ServerName, Set<RegionInfo>> serverToPrimaries, 362 Map<RegionInfo, ServerName[]> secondaryAndTertiaryMap, String primaryRack, ServerName primaryRS, 363 RegionInfo regionInfo) throws IOException { 364 // Random to choose the secondary and tertiary region server 365 // from another rack to place the secondary and tertiary 366 // Random to choose one rack except for the current rack 367 Set<String> rackSkipSet = new HashSet<>(); 368 rackSkipSet.add(primaryRack); 369 String secondaryRack = getOneRandomRack(rackSkipSet); 370 List<ServerName> serverList = getServersFromRack(secondaryRack); 371 Set<ServerName> serverSet = new HashSet<>(serverList); 372 ServerName[] favoredNodes; 373 if (serverList.size() >= 2) { 374 // Randomly pick up two servers from this secondary rack 375 // Skip the secondary for the tertiary placement 376 // skip the servers which share the primary already 377 Set<RegionInfo> primaries = serverToPrimaries.get(primaryRS); 378 Set<ServerName> skipServerSet = new HashSet<>(); 379 while (true) { 380 ServerName[] secondaryAndTertiary = null; 381 if (primaries.size() > 1) { 382 // check where his tertiary and secondary are 383 for (RegionInfo primary : primaries) { 384 secondaryAndTertiary = secondaryAndTertiaryMap.get(primary); 385 if (secondaryAndTertiary != null) { 386 if (getRackOfServer(secondaryAndTertiary[0]).equals(secondaryRack)) { 387 skipServerSet.add(secondaryAndTertiary[0]); 388 } 389 if (getRackOfServer(secondaryAndTertiary[1]).equals(secondaryRack)) { 390 skipServerSet.add(secondaryAndTertiary[1]); 391 } 392 } 393 } 394 } 395 if (skipServerSet.size() + 2 <= serverSet.size()) break; 396 skipServerSet.clear(); 397 rackSkipSet.add(secondaryRack); 398 // we used all racks 399 if (rackSkipSet.size() == getTotalNumberOfRacks()) { 400 // remove the last two added and break 401 skipServerSet.remove(secondaryAndTertiary[0]); 402 skipServerSet.remove(secondaryAndTertiary[1]); 403 break; 404 } 405 secondaryRack = getOneRandomRack(rackSkipSet); 406 serverList = getServersFromRack(secondaryRack); 407 serverSet = new HashSet<>(serverList); 408 } 409 410 // Place the secondary RS 411 ServerName secondaryRS = getOneRandomServer(secondaryRack, skipServerSet); 412 skipServerSet.add(secondaryRS); 413 // Place the tertiary RS 414 ServerName tertiaryRS = getOneRandomServer(secondaryRack, skipServerSet); 415 416 if (secondaryRS == null || tertiaryRS == null) { 417 LOG.error("Cannot place the secondary and tertiary" + " region server for region " 418 + regionInfo.getRegionNameAsString()); 419 } 420 // Create the secondary and tertiary pair 421 favoredNodes = new ServerName[2]; 422 favoredNodes[0] = secondaryRS; 423 favoredNodes[1] = tertiaryRS; 424 } else { 425 // Pick the secondary rs from this secondary rack 426 // and pick the tertiary from another random rack 427 favoredNodes = new ServerName[2]; 428 ServerName secondary = getOneRandomServer(secondaryRack); 429 favoredNodes[0] = secondary; 430 431 // Pick the tertiary 432 if (getTotalNumberOfRacks() == 2) { 433 // Pick the tertiary from the same rack of the primary RS 434 Set<ServerName> serverSkipSet = new HashSet<>(); 435 serverSkipSet.add(primaryRS); 436 favoredNodes[1] = getOneRandomServer(primaryRack, serverSkipSet); 437 } else { 438 // Pick the tertiary from another rack 439 rackSkipSet.add(secondaryRack); 440 String tertiaryRandomRack = getOneRandomRack(rackSkipSet); 441 favoredNodes[1] = getOneRandomServer(tertiaryRandomRack); 442 } 443 } 444 return favoredNodes; 445 } 446 447 private ServerName[] singleRackCase(RegionInfo regionInfo, ServerName primaryRS, 448 String primaryRack) throws IOException { 449 // Single rack case: have to pick the secondary and tertiary 450 // from the same rack 451 List<ServerName> serverList = getServersFromRack(primaryRack); 452 if ((serverList == null) || (serverList.size() <= 2)) { 453 // Single region server case: cannot not place the favored nodes 454 // on any server; 455 return null; 456 } else { 457 // Randomly select two region servers from the server list and make sure 458 // they are not overlap with the primary region server; 459 Set<ServerName> serverSkipSet = new HashSet<>(); 460 serverSkipSet.add(primaryRS); 461 462 // Place the secondary RS 463 ServerName secondaryRS = getOneRandomServer(primaryRack, serverSkipSet); 464 // Skip the secondary for the tertiary placement 465 serverSkipSet.add(secondaryRS); 466 ServerName tertiaryRS = getOneRandomServer(primaryRack, serverSkipSet); 467 468 if (secondaryRS == null || tertiaryRS == null) { 469 LOG.error("Cannot place the secondary, tertiary favored node for region " 470 + regionInfo.getRegionNameAsString()); 471 } 472 // Create the secondary and tertiary pair 473 ServerName[] favoredNodes = new ServerName[2]; 474 favoredNodes[0] = secondaryRS; 475 favoredNodes[1] = tertiaryRS; 476 return favoredNodes; 477 } 478 } 479 480 /** 481 * Place secondary and tertiary nodes in a multi rack case. If there are only two racks, then we 482 * try the place the secondary and tertiary on different rack than primary. But if the other rack 483 * has only one region server, then we place primary and tertiary on one rack and secondary on 484 * another. The aim is two distribute the three favored nodes on >= 2 racks. TODO: see how we can 485 * use generateMissingFavoredNodeMultiRack API here 486 * @param regionInfo Region for which we are trying to generate FN 487 * @param primaryRS The primary favored node. 488 * @param primaryRack The rack of the primary favored node. 489 * @return Array containing secondary and tertiary favored nodes. 490 * @throws IOException Signals that an I/O exception has occurred. 491 */ 492 private ServerName[] multiRackCase(RegionInfo regionInfo, ServerName primaryRS, 493 String primaryRack) throws IOException { 494 495 List<ServerName> favoredNodes = Lists.newArrayList(primaryRS); 496 // Create the secondary and tertiary pair 497 ServerName secondaryRS = generateMissingFavoredNodeMultiRack(favoredNodes); 498 favoredNodes.add(secondaryRS); 499 String secondaryRack = getRackOfServer(secondaryRS); 500 501 ServerName tertiaryRS; 502 if (primaryRack.equals(secondaryRack)) { 503 tertiaryRS = generateMissingFavoredNode(favoredNodes); 504 } else { 505 // Try to place tertiary in secondary RS rack else place on primary rack. 506 tertiaryRS = getOneRandomServer(secondaryRack, Sets.newHashSet(secondaryRS)); 507 if (tertiaryRS == null) { 508 tertiaryRS = getOneRandomServer(primaryRack, Sets.newHashSet(primaryRS)); 509 } 510 // We couldn't find anything in secondary rack, get any FN 511 if (tertiaryRS == null) { 512 tertiaryRS = generateMissingFavoredNode(Lists.newArrayList(primaryRS, secondaryRS)); 513 } 514 } 515 return new ServerName[] { secondaryRS, tertiaryRS }; 516 } 517 518 public boolean canPlaceFavoredNodes() { 519 return (this.servers.size() >= FAVORED_NODES_NUM); 520 } 521 522 private int getTotalNumberOfRacks() { 523 return this.uniqueRackList.size(); 524 } 525 526 private List<ServerName> getServersFromRack(String rack) { 527 return this.rackToRegionServerMap.get(rack); 528 } 529 530 /** 531 * Gets a random server from the specified rack and skips anything specified. 532 * @param rack rack from a server is needed 533 * @param skipServerSet the server shouldn't belong to this set 534 */ 535 protected ServerName getOneRandomServer(String rack, Set<ServerName> skipServerSet) { 536 537 // Is the rack valid? Do we recognize it? 538 if (rack == null || getServersFromRack(rack) == null || getServersFromRack(rack).isEmpty()) { 539 return null; 540 } 541 542 // Lets use a set so we can eliminate duplicates 543 Set<StartcodeAgnosticServerName> serversToChooseFrom = Sets.newHashSet(); 544 for (ServerName sn : getServersFromRack(rack)) { 545 serversToChooseFrom.add(StartcodeAgnosticServerName.valueOf(sn)); 546 } 547 548 if (skipServerSet != null && skipServerSet.size() > 0) { 549 for (ServerName sn : skipServerSet) { 550 serversToChooseFrom.remove(StartcodeAgnosticServerName.valueOf(sn)); 551 } 552 // Do we have any servers left to choose from? 553 if (serversToChooseFrom.isEmpty()) { 554 return null; 555 } 556 } 557 558 ServerName randomServer = null; 559 int randomIndex = ThreadLocalRandom.current().nextInt(serversToChooseFrom.size()); 560 int j = 0; 561 for (StartcodeAgnosticServerName sn : serversToChooseFrom) { 562 if (j == randomIndex) { 563 randomServer = sn; 564 break; 565 } 566 j++; 567 } 568 569 if (randomServer != null) { 570 return ServerName.valueOf(randomServer.getAddress(), randomServer.getStartcode()); 571 } else { 572 return null; 573 } 574 } 575 576 private ServerName getOneRandomServer(String rack) throws IOException { 577 return this.getOneRandomServer(rack, null); 578 } 579 580 String getOneRandomRack(Set<String> skipRackSet) throws IOException { 581 if (skipRackSet == null || uniqueRackList.size() <= skipRackSet.size()) { 582 throw new IOException("Cannot randomly pick another random server"); 583 } 584 585 String randomRack; 586 do { 587 int randomIndex = ThreadLocalRandom.current().nextInt(this.uniqueRackList.size()); 588 randomRack = this.uniqueRackList.get(randomIndex); 589 } while (skipRackSet.contains(randomRack)); 590 591 return randomRack; 592 } 593 594 public static String getFavoredNodesAsString(List<ServerName> nodes) { 595 StringBuilder strBuf = new StringBuilder(); 596 int i = 0; 597 for (ServerName node : nodes) { 598 strBuf.append(node.getAddress()); 599 if (++i != nodes.size()) strBuf.append(";"); 600 } 601 return strBuf.toString(); 602 } 603 604 /* 605 * Generates a missing favored node based on the input favored nodes. This helps to generate new 606 * FN when there is already 2 FN and we need a third one. For eg, while generating new FN for 607 * split daughters after inheriting 2 FN from the parent. If the cluster has only one rack it 608 * generates from the same rack. If the cluster has multiple racks, then it ensures the new FN 609 * respects the rack constraints similar to HDFS. For eg: if there are 3 FN, they will be spread 610 * across 2 racks. 611 */ 612 public ServerName generateMissingFavoredNode(List<ServerName> favoredNodes) throws IOException { 613 if (this.uniqueRackList.size() == 1) { 614 return generateMissingFavoredNodeSingleRack(favoredNodes, null); 615 } else { 616 return generateMissingFavoredNodeMultiRack(favoredNodes, null); 617 } 618 } 619 620 public ServerName generateMissingFavoredNode(List<ServerName> favoredNodes, 621 List<ServerName> excludeNodes) throws IOException { 622 if (this.uniqueRackList.size() == 1) { 623 return generateMissingFavoredNodeSingleRack(favoredNodes, excludeNodes); 624 } else { 625 return generateMissingFavoredNodeMultiRack(favoredNodes, excludeNodes); 626 } 627 } 628 629 /* 630 * Generate FN for a single rack scenario, don't generate from one of the excluded nodes. Helps 631 * when we would like to find a replacement node. 632 */ 633 private ServerName generateMissingFavoredNodeSingleRack(List<ServerName> favoredNodes, 634 List<ServerName> excludeNodes) throws IOException { 635 ServerName newServer = null; 636 Set<ServerName> excludeFNSet = Sets.newHashSet(favoredNodes); 637 if (excludeNodes != null && excludeNodes.size() > 0) { 638 excludeFNSet.addAll(excludeNodes); 639 } 640 if (favoredNodes.size() < FAVORED_NODES_NUM) { 641 newServer = this.getOneRandomServer(this.uniqueRackList.get(0), excludeFNSet); 642 } 643 return newServer; 644 } 645 646 private ServerName generateMissingFavoredNodeMultiRack(List<ServerName> favoredNodes) 647 throws IOException { 648 return generateMissingFavoredNodeMultiRack(favoredNodes, null); 649 } 650 651 /* 652 * Generates a missing FN based on the input favoredNodes and also the nodes to be skipped. Get 653 * the current layout of favored nodes arrangement and nodes to be excluded and get a random node 654 * that goes with HDFS block placement. Eg: If the existing nodes are on one rack, generate one 655 * from another rack. We exclude as much as possible so the random selection has more chance to 656 * generate a node within a few iterations, ideally 1. 657 */ 658 private ServerName generateMissingFavoredNodeMultiRack(List<ServerName> favoredNodes, 659 List<ServerName> excludeNodes) throws IOException { 660 661 Set<String> racks = Sets.newHashSet(); 662 Map<String, Set<ServerName>> rackToFNMapping = new HashMap<>(); 663 664 // Lets understand the current rack distribution of the FN 665 for (ServerName sn : favoredNodes) { 666 String rack = getRackOfServer(sn); 667 racks.add(rack); 668 669 Set<ServerName> serversInRack = rackToFNMapping.get(rack); 670 if (serversInRack == null) { 671 serversInRack = Sets.newHashSet(); 672 rackToFNMapping.put(rack, serversInRack); 673 } 674 serversInRack.add(sn); 675 } 676 677 // What racks should be skipped while getting a FN? 678 Set<String> skipRackSet = Sets.newHashSet(); 679 680 /* 681 * If both the FN are from the same rack, then we don't want to generate another FN on the same 682 * rack. If that rack fails, the region would be unavailable. 683 */ 684 if (racks.size() == 1 && favoredNodes.size() > 1) { 685 skipRackSet.add(racks.iterator().next()); 686 } 687 688 /* 689 * If there are no free nodes on the existing racks, we should skip those racks too. We can 690 * reduce the number of iterations for FN selection. 691 */ 692 for (String rack : racks) { 693 if ( 694 getServersFromRack(rack) != null 695 && rackToFNMapping.get(rack).size() == getServersFromRack(rack).size() 696 ) { 697 skipRackSet.add(rack); 698 } 699 } 700 701 Set<ServerName> favoredNodeSet = Sets.newHashSet(favoredNodes); 702 if (excludeNodes != null && excludeNodes.size() > 0) { 703 favoredNodeSet.addAll(excludeNodes); 704 } 705 706 /* 707 * Lets get a random rack by excluding skipRackSet and generate a random FN from that rack. 708 */ 709 int i = 0; 710 Set<String> randomRacks = Sets.newHashSet(); 711 ServerName newServer = null; 712 do { 713 String randomRack = this.getOneRandomRack(skipRackSet); 714 newServer = this.getOneRandomServer(randomRack, favoredNodeSet); 715 randomRacks.add(randomRack); 716 i++; 717 } while ((i < MAX_ATTEMPTS_FN_GENERATION) && (newServer == null)); 718 719 if (newServer == null) { 720 if (LOG.isTraceEnabled()) { 721 LOG.trace(String.format( 722 "Unable to generate additional favored nodes for %s after " 723 + "considering racks %s and skip rack %s with a unique rack list of %s and rack " 724 + "to RS map of %s and RS to rack map of %s", 725 StringUtils.join(favoredNodes, ","), randomRacks, skipRackSet, uniqueRackList, 726 rackToRegionServerMap, regionServerToRackMap)); 727 } 728 throw new IOException( 729 " Unable to generate additional favored nodes for " + StringUtils.join(favoredNodes, ",")); 730 } 731 return newServer; 732 } 733 734 /* 735 * Generate favored nodes for a region. Choose a random server as primary and then choose 736 * secondary and tertiary FN so its spread across two racks. 737 */ 738 public List<ServerName> generateFavoredNodes(RegionInfo hri) throws IOException { 739 740 List<ServerName> favoredNodesForRegion = new ArrayList<>(FAVORED_NODES_NUM); 741 ServerName primary = servers.get(ThreadLocalRandom.current().nextInt(servers.size())); 742 favoredNodesForRegion.add(ServerName.valueOf(primary.getAddress(), ServerName.NON_STARTCODE)); 743 744 Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>(1); 745 primaryRSMap.put(hri, primary); 746 Map<RegionInfo, ServerName[]> secondaryAndTertiaryRSMap = 747 placeSecondaryAndTertiaryRS(primaryRSMap); 748 ServerName[] secondaryAndTertiaryNodes = secondaryAndTertiaryRSMap.get(hri); 749 if (secondaryAndTertiaryNodes != null && secondaryAndTertiaryNodes.length == 2) { 750 for (ServerName sn : secondaryAndTertiaryNodes) { 751 favoredNodesForRegion.add(ServerName.valueOf(sn.getAddress(), ServerName.NON_STARTCODE)); 752 } 753 return favoredNodesForRegion; 754 } else { 755 throw new HBaseIOException("Unable to generate secondary and tertiary favored nodes."); 756 } 757 } 758 759 public Map<RegionInfo, List<ServerName>> generateFavoredNodesRoundRobin( 760 Map<ServerName, List<RegionInfo>> assignmentMap, List<RegionInfo> regions) throws IOException { 761 762 if (regions.size() > 0) { 763 if (canPlaceFavoredNodes()) { 764 Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>(); 765 // Lets try to have an equal distribution for primary favored node 766 placePrimaryRSAsRoundRobin(assignmentMap, primaryRSMap, regions); 767 return generateFavoredNodes(primaryRSMap); 768 769 } else { 770 throw new HBaseIOException("Not enough nodes to generate favored nodes"); 771 } 772 } 773 return null; 774 } 775 776 /* 777 * Generate favored nodes for a set of regions when we know where they are currently hosted. 778 */ 779 private Map<RegionInfo, List<ServerName>> 780 generateFavoredNodes(Map<RegionInfo, ServerName> primaryRSMap) { 781 782 Map<RegionInfo, List<ServerName>> generatedFavNodes = new HashMap<>(); 783 Map<RegionInfo, ServerName[]> secondaryAndTertiaryRSMap = 784 placeSecondaryAndTertiaryRS(primaryRSMap); 785 786 for (Entry<RegionInfo, ServerName> entry : primaryRSMap.entrySet()) { 787 List<ServerName> favoredNodesForRegion = new ArrayList<>(FAVORED_NODES_NUM); 788 RegionInfo region = entry.getKey(); 789 ServerName primarySN = entry.getValue(); 790 favoredNodesForRegion 791 .add(ServerName.valueOf(primarySN.getHostname(), primarySN.getPort(), NON_STARTCODE)); 792 ServerName[] secondaryAndTertiaryNodes = secondaryAndTertiaryRSMap.get(region); 793 if (secondaryAndTertiaryNodes != null) { 794 favoredNodesForRegion.add(ServerName.valueOf(secondaryAndTertiaryNodes[0].getHostname(), 795 secondaryAndTertiaryNodes[0].getPort(), NON_STARTCODE)); 796 favoredNodesForRegion.add(ServerName.valueOf(secondaryAndTertiaryNodes[1].getHostname(), 797 secondaryAndTertiaryNodes[1].getPort(), NON_STARTCODE)); 798 } 799 generatedFavNodes.put(region, favoredNodesForRegion); 800 } 801 return generatedFavNodes; 802 } 803 804 /** 805 * Get the rack of server from local mapping when present, saves lookup by the RackManager. 806 */ 807 private String getRackOfServer(ServerName sn) { 808 if (this.regionServerToRackMap.containsKey(sn.getHostname())) { 809 return this.regionServerToRackMap.get(sn.getHostname()); 810 } else { 811 String rack = this.rackManager.getRack(sn); 812 this.regionServerToRackMap.put(sn.getHostname(), rack); 813 return rack; 814 } 815 } 816 817 public static int getDataNodePort(Configuration conf) { 818 HdfsConfiguration.init(); 819 Configuration dnConf = new HdfsConfiguration(conf); 820 int dnPort = NetUtils.createSocketAddr(dnConf.get(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, 821 DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort(); 822 LOG.debug("Loaded default datanode port for FN: {}", dnPort); 823 return dnPort; 824 } 825}