001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.favored; 019 020import static org.apache.hadoop.hbase.ServerName.NON_STARTCODE; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Map.Entry; 029import java.util.Set; 030import java.util.concurrent.ThreadLocalRandom; 031import org.apache.commons.lang3.StringUtils; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.CatalogFamilyFormat; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CellBuilderFactory; 036import org.apache.hadoop.hbase.CellBuilderType; 037import org.apache.hadoop.hbase.HBaseIOException; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.ServerName; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.client.Connection; 042import org.apache.hadoop.hbase.client.ConnectionFactory; 043import org.apache.hadoop.hbase.client.Put; 044import org.apache.hadoop.hbase.client.RegionInfo; 045import org.apache.hadoop.hbase.client.Table; 046import org.apache.hadoop.hbase.master.RackManager; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 049import org.apache.hadoop.hdfs.DFSConfigKeys; 050import org.apache.hadoop.hdfs.HdfsConfiguration; 051import org.apache.hadoop.net.NetUtils; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 057import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 058import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 059 060import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FavoredNodes; 063 064/** 065 * Helper class for FavoredNodeLoadBalancer that has all the intelligence for racks, meta scans, 066 * etc. Instantiated by the FavoredNodeLoadBalancer when needed (from within calls like 067 * FavoredNodeLoadBalancer#randomAssignment(RegionInfo, List). All updates to favored nodes should 068 * only be done from FavoredNodesManager and not through this helper class (except for tests). 069 */ 070@InterfaceAudience.Private 071public class FavoredNodeAssignmentHelper { 072 private static final Logger LOG = LoggerFactory.getLogger(FavoredNodeAssignmentHelper.class); 073 private RackManager rackManager; 074 private Map<String, List<ServerName>> rackToRegionServerMap; 075 private List<String> uniqueRackList; 076 // This map serves as a cache for rack to sn lookups. The num of 077 // region server entries might not match with that is in servers. 078 private Map<String, String> regionServerToRackMap; 079 private List<ServerName> servers; 080 public static final byte[] FAVOREDNODES_QUALIFIER = Bytes.toBytes("fn"); 081 public final static short FAVORED_NODES_NUM = 3; 082 public final static short MAX_ATTEMPTS_FN_GENERATION = 10; 083 084 public FavoredNodeAssignmentHelper(final List<ServerName> servers, Configuration conf) { 085 this(servers, new RackManager(conf)); 086 } 087 088 public FavoredNodeAssignmentHelper(final List<ServerName> servers, 089 final RackManager rackManager) { 090 this.servers = servers; 091 this.rackManager = rackManager; 092 this.rackToRegionServerMap = new HashMap<>(); 093 this.regionServerToRackMap = new HashMap<>(); 094 this.uniqueRackList = new ArrayList<>(); 095 } 096 097 // Always initialize() when FavoredNodeAssignmentHelper is constructed. 098 public void initialize() { 099 for (ServerName sn : this.servers) { 100 String rackName = getRackOfServer(sn); 101 List<ServerName> serverList = this.rackToRegionServerMap.get(rackName); 102 if (serverList == null) { 103 serverList = Lists.newArrayList(); 104 // Add the current rack to the unique rack list 105 this.uniqueRackList.add(rackName); 106 this.rackToRegionServerMap.put(rackName, serverList); 107 } 108 for (ServerName serverName : serverList) { 109 if (ServerName.isSameAddress(sn, serverName)) { 110 // The server is already present, ignore. 111 break; 112 } 113 } 114 serverList.add(sn); 115 this.regionServerToRackMap.put(sn.getHostname(), rackName); 116 } 117 } 118 119 /** 120 * Update meta table with favored nodes info 121 * @param regionToFavoredNodes map of RegionInfo's to their favored nodes 122 * @param connection connection to be used 123 */ 124 public static void updateMetaWithFavoredNodesInfo( 125 Map<RegionInfo, List<ServerName>> regionToFavoredNodes, Connection connection) 126 throws IOException { 127 List<Put> puts = new ArrayList<>(); 128 for (Map.Entry<RegionInfo, List<ServerName>> entry : regionToFavoredNodes.entrySet()) { 129 Put put = makePut(entry.getKey(), entry.getValue()); 130 if (put != null) { 131 puts.add(put); 132 } 133 } 134 try (Table table = connection.getTable(TableName.META_TABLE_NAME)) { 135 table.put(puts); 136 } 137 LOG.info("Added " + puts.size() + " region favored nodes in META"); 138 } 139 140 /** 141 * Update meta table with favored nodes info 142 */ 143 public static void updateMetaWithFavoredNodesInfo( 144 Map<RegionInfo, List<ServerName>> regionToFavoredNodes, Configuration conf) throws IOException { 145 // Write the region assignments to the meta table. 146 // TODO: See above overrides take a Connection rather than a Configuration only the 147 // Connection is a short circuit connection. That is not going to good in all cases, when 148 // master and meta are not colocated. Fix when this favored nodes feature is actually used 149 // someday. 150 try (Connection conn = ConnectionFactory.createConnection(conf)) { 151 updateMetaWithFavoredNodesInfo(regionToFavoredNodes, conn); 152 } 153 } 154 155 private static Put makePut(RegionInfo regionInfo, List<ServerName> favoredNodeList) 156 throws IOException { 157 if (CollectionUtils.isEmpty(favoredNodeList)) { 158 return null; 159 } 160 long time = EnvironmentEdgeManager.currentTime(); 161 Put put = new Put(CatalogFamilyFormat.getMetaKeyForRegion(regionInfo), time); 162 byte[] favoredNodes = getFavoredNodes(favoredNodeList); 163 put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow()) 164 .setFamily(HConstants.CATALOG_FAMILY).setQualifier(FAVOREDNODES_QUALIFIER).setTimestamp(time) 165 .setType(Cell.Type.Put).setValue(favoredNodes).build()); 166 LOG.debug("Create the region {} with favored nodes {}", regionInfo.getRegionNameAsString(), 167 favoredNodeList); 168 return put; 169 } 170 171 /** 172 * Convert PB bytes to ServerName. 173 * @param favoredNodes The PB'ed bytes of favored nodes 174 * @return the array of {@link ServerName} for the byte array of favored nodes. 175 */ 176 public static ServerName[] getFavoredNodesList(byte[] favoredNodes) throws IOException { 177 FavoredNodes f = FavoredNodes.parseFrom(favoredNodes); 178 List<HBaseProtos.ServerName> protoNodes = f.getFavoredNodeList(); 179 ServerName[] servers = new ServerName[protoNodes.size()]; 180 int i = 0; 181 for (HBaseProtos.ServerName node : protoNodes) { 182 servers[i++] = ProtobufUtil.toServerName(node); 183 } 184 return servers; 185 } 186 187 /** Returns PB'ed bytes of {@link FavoredNodes} generated by the server list. */ 188 public static byte[] getFavoredNodes(List<ServerName> serverAddrList) { 189 FavoredNodes.Builder f = FavoredNodes.newBuilder(); 190 for (ServerName s : serverAddrList) { 191 HBaseProtos.ServerName.Builder b = HBaseProtos.ServerName.newBuilder(); 192 b.setHostName(s.getHostname()); 193 b.setPort(s.getPort()); 194 b.setStartCode(ServerName.NON_STARTCODE); 195 f.addFavoredNode(b.build()); 196 } 197 return f.build().toByteArray(); 198 } 199 200 // Place the regions round-robin across the racks picking one server from each 201 // rack at a time. Start with a random rack, and a random server from every rack. 202 // If a rack doesn't have enough servers it will go to the next rack and so on. 203 // for choosing a primary. 204 // For example, if 4 racks (r1 .. r4) with 8 servers (s1..s8) each, one possible 205 // placement could be r2:s5, r3:s5, r4:s5, r1:s5, r2:s6, r3:s6.. 206 // If there were fewer servers in one rack, say r3, which had 3 servers, one possible 207 // placement could be r2:s5, <skip-r3>, r4:s5, r1:s5, r2:s6, <skip-r3> ... 208 // The regions should be distributed proportionately to the racksizes 209 public void placePrimaryRSAsRoundRobin(Map<ServerName, List<RegionInfo>> assignmentMap, 210 Map<RegionInfo, ServerName> primaryRSMap, List<RegionInfo> regions) { 211 List<String> rackList = new ArrayList<>(rackToRegionServerMap.size()); 212 rackList.addAll(rackToRegionServerMap.keySet()); 213 int rackIndex = ThreadLocalRandom.current().nextInt(rackList.size()); 214 int maxRackSize = 0; 215 for (Map.Entry<String, List<ServerName>> r : rackToRegionServerMap.entrySet()) { 216 if (r.getValue().size() > maxRackSize) { 217 maxRackSize = r.getValue().size(); 218 } 219 } 220 int numIterations = 0; 221 // Initialize the current processing host index. 222 int serverIndex = ThreadLocalRandom.current().nextInt(maxRackSize); 223 for (RegionInfo regionInfo : regions) { 224 List<ServerName> currentServerList; 225 String rackName; 226 while (true) { 227 rackName = rackList.get(rackIndex); 228 numIterations++; 229 // Get the server list for the current rack 230 currentServerList = rackToRegionServerMap.get(rackName); 231 232 if (serverIndex >= currentServerList.size()) { // not enough machines in this rack 233 if (numIterations % rackList.size() == 0) { 234 if (++serverIndex >= maxRackSize) serverIndex = 0; 235 } 236 if (++rackIndex >= rackList.size()) { 237 rackIndex = 0; // reset the rack index to 0 238 } 239 } else break; 240 } 241 242 // Get the current process region server 243 ServerName currentServer = currentServerList.get(serverIndex); 244 245 // Place the current region with the current primary region server 246 primaryRSMap.put(regionInfo, currentServer); 247 if (assignmentMap != null) { 248 List<RegionInfo> regionsForServer = assignmentMap.get(currentServer); 249 if (regionsForServer == null) { 250 regionsForServer = new ArrayList<>(); 251 assignmentMap.put(currentServer, regionsForServer); 252 } 253 regionsForServer.add(regionInfo); 254 } 255 256 // Set the next processing index 257 if (numIterations % rackList.size() == 0) { 258 ++serverIndex; 259 } 260 if (++rackIndex >= rackList.size()) { 261 rackIndex = 0; // reset the rack index to 0 262 } 263 } 264 } 265 266 public Map<RegionInfo, ServerName[]> 267 placeSecondaryAndTertiaryRS(Map<RegionInfo, ServerName> primaryRSMap) { 268 Map<RegionInfo, ServerName[]> secondaryAndTertiaryMap = new HashMap<>(); 269 for (Map.Entry<RegionInfo, ServerName> entry : primaryRSMap.entrySet()) { 270 // Get the target region and its primary region server rack 271 RegionInfo regionInfo = entry.getKey(); 272 ServerName primaryRS = entry.getValue(); 273 try { 274 // Create the secondary and tertiary region server pair object. 275 ServerName[] favoredNodes = getSecondaryAndTertiary(regionInfo, primaryRS); 276 if (favoredNodes != null) { 277 secondaryAndTertiaryMap.put(regionInfo, favoredNodes); 278 LOG.debug("Place the secondary and tertiary region server for region " 279 + regionInfo.getRegionNameAsString()); 280 } 281 } catch (Exception e) { 282 LOG.warn("Cannot place the favored nodes for region " + regionInfo.getRegionNameAsString() 283 + " because " + e, e); 284 continue; 285 } 286 } 287 return secondaryAndTertiaryMap; 288 } 289 290 public ServerName[] getSecondaryAndTertiary(RegionInfo regionInfo, ServerName primaryRS) 291 throws IOException { 292 293 ServerName[] favoredNodes;// Get the rack for the primary region server 294 String primaryRack = getRackOfServer(primaryRS); 295 296 if (getTotalNumberOfRacks() == 1) { 297 favoredNodes = singleRackCase(regionInfo, primaryRS, primaryRack); 298 } else { 299 favoredNodes = multiRackCase(primaryRS, primaryRack); 300 } 301 return favoredNodes; 302 } 303 304 private Map<ServerName, Set<RegionInfo>> 305 mapRSToPrimaries(Map<RegionInfo, ServerName> primaryRSMap) { 306 Map<ServerName, Set<RegionInfo>> primaryServerMap = new HashMap<>(); 307 for (Entry<RegionInfo, ServerName> e : primaryRSMap.entrySet()) { 308 Set<RegionInfo> currentSet = primaryServerMap.get(e.getValue()); 309 if (currentSet == null) { 310 currentSet = new HashSet<>(); 311 } 312 currentSet.add(e.getKey()); 313 primaryServerMap.put(e.getValue(), currentSet); 314 } 315 return primaryServerMap; 316 } 317 318 /** 319 * For regions that share the primary, avoid placing the secondary and tertiary on a same RS. Used 320 * for generating new assignments for the primary/secondary/tertiary RegionServers 321 * @return the map of regions to the servers the region-files should be hosted on 322 */ 323 public Map<RegionInfo, ServerName[]> 324 placeSecondaryAndTertiaryWithRestrictions(Map<RegionInfo, ServerName> primaryRSMap) { 325 Map<ServerName, Set<RegionInfo>> serverToPrimaries = mapRSToPrimaries(primaryRSMap); 326 Map<RegionInfo, ServerName[]> secondaryAndTertiaryMap = new HashMap<>(); 327 328 for (Entry<RegionInfo, ServerName> entry : primaryRSMap.entrySet()) { 329 // Get the target region and its primary region server rack 330 RegionInfo regionInfo = entry.getKey(); 331 ServerName primaryRS = entry.getValue(); 332 try { 333 // Get the rack for the primary region server 334 String primaryRack = getRackOfServer(primaryRS); 335 ServerName[] favoredNodes = null; 336 if (getTotalNumberOfRacks() == 1) { 337 // Single rack case: have to pick the secondary and tertiary 338 // from the same rack 339 favoredNodes = singleRackCase(regionInfo, primaryRS, primaryRack); 340 } else { 341 favoredNodes = multiRackCaseWithRestrictions(serverToPrimaries, secondaryAndTertiaryMap, 342 primaryRack, primaryRS, regionInfo); 343 } 344 if (favoredNodes != null) { 345 secondaryAndTertiaryMap.put(regionInfo, favoredNodes); 346 LOG.debug("Place the secondary and tertiary region server for region " 347 + regionInfo.getRegionNameAsString()); 348 } 349 } catch (Exception e) { 350 LOG.warn("Cannot place the favored nodes for region " + regionInfo.getRegionNameAsString() 351 + " because " + e, e); 352 continue; 353 } 354 } 355 return secondaryAndTertiaryMap; 356 } 357 358 private ServerName[] multiRackCaseWithRestrictions( 359 Map<ServerName, Set<RegionInfo>> serverToPrimaries, 360 Map<RegionInfo, ServerName[]> secondaryAndTertiaryMap, String primaryRack, ServerName primaryRS, 361 RegionInfo regionInfo) throws IOException { 362 // Random to choose the secondary and tertiary region server 363 // from another rack to place the secondary and tertiary 364 // Random to choose one rack except for the current rack 365 Set<String> rackSkipSet = new HashSet<>(); 366 rackSkipSet.add(primaryRack); 367 String secondaryRack = getOneRandomRack(rackSkipSet); 368 List<ServerName> serverList = getServersFromRack(secondaryRack); 369 Set<ServerName> serverSet = new HashSet<>(serverList); 370 ServerName[] favoredNodes; 371 if (serverList.size() >= 2) { 372 // Randomly pick up two servers from this secondary rack 373 // Skip the secondary for the tertiary placement 374 // skip the servers which share the primary already 375 Set<RegionInfo> primaries = serverToPrimaries.get(primaryRS); 376 Set<ServerName> skipServerSet = new HashSet<>(); 377 while (true) { 378 ServerName[] secondaryAndTertiary = null; 379 if (primaries.size() > 1) { 380 // check where his tertiary and secondary are 381 for (RegionInfo primary : primaries) { 382 secondaryAndTertiary = secondaryAndTertiaryMap.get(primary); 383 if (secondaryAndTertiary != null) { 384 if (getRackOfServer(secondaryAndTertiary[0]).equals(secondaryRack)) { 385 skipServerSet.add(secondaryAndTertiary[0]); 386 } 387 if (getRackOfServer(secondaryAndTertiary[1]).equals(secondaryRack)) { 388 skipServerSet.add(secondaryAndTertiary[1]); 389 } 390 } 391 } 392 } 393 if (skipServerSet.size() + 2 <= serverSet.size()) break; 394 skipServerSet.clear(); 395 rackSkipSet.add(secondaryRack); 396 // we used all racks 397 if (rackSkipSet.size() == getTotalNumberOfRacks()) { 398 // remove the last two added and break 399 skipServerSet.remove(secondaryAndTertiary[0]); 400 skipServerSet.remove(secondaryAndTertiary[1]); 401 break; 402 } 403 secondaryRack = getOneRandomRack(rackSkipSet); 404 serverList = getServersFromRack(secondaryRack); 405 serverSet = new HashSet<>(serverList); 406 } 407 408 // Place the secondary RS 409 ServerName secondaryRS = getOneRandomServer(secondaryRack, skipServerSet); 410 skipServerSet.add(secondaryRS); 411 // Place the tertiary RS 412 ServerName tertiaryRS = getOneRandomServer(secondaryRack, skipServerSet); 413 414 if (secondaryRS == null || tertiaryRS == null) { 415 LOG.error("Cannot place the secondary and tertiary" + " region server for region " 416 + regionInfo.getRegionNameAsString()); 417 } 418 // Create the secondary and tertiary pair 419 favoredNodes = new ServerName[2]; 420 favoredNodes[0] = secondaryRS; 421 favoredNodes[1] = tertiaryRS; 422 } else { 423 // Pick the secondary rs from this secondary rack 424 // and pick the tertiary from another random rack 425 favoredNodes = new ServerName[2]; 426 ServerName secondary = getOneRandomServer(secondaryRack); 427 favoredNodes[0] = secondary; 428 429 // Pick the tertiary 430 if (getTotalNumberOfRacks() == 2) { 431 // Pick the tertiary from the same rack of the primary RS 432 Set<ServerName> serverSkipSet = new HashSet<>(); 433 serverSkipSet.add(primaryRS); 434 favoredNodes[1] = getOneRandomServer(primaryRack, serverSkipSet); 435 } else { 436 // Pick the tertiary from another rack 437 rackSkipSet.add(secondaryRack); 438 String tertiaryRandomRack = getOneRandomRack(rackSkipSet); 439 favoredNodes[1] = getOneRandomServer(tertiaryRandomRack); 440 } 441 } 442 return favoredNodes; 443 } 444 445 private ServerName[] singleRackCase(RegionInfo regionInfo, ServerName primaryRS, 446 String primaryRack) throws IOException { 447 // Single rack case: have to pick the secondary and tertiary 448 // from the same rack 449 List<ServerName> serverList = getServersFromRack(primaryRack); 450 if ((serverList == null) || (serverList.size() <= 2)) { 451 // Single region server case: cannot not place the favored nodes 452 // on any server; 453 return null; 454 } else { 455 // Randomly select two region servers from the server list and make sure 456 // they are not overlap with the primary region server; 457 Set<ServerName> serverSkipSet = new HashSet<>(); 458 serverSkipSet.add(primaryRS); 459 460 // Place the secondary RS 461 ServerName secondaryRS = getOneRandomServer(primaryRack, serverSkipSet); 462 // Skip the secondary for the tertiary placement 463 serverSkipSet.add(secondaryRS); 464 ServerName tertiaryRS = getOneRandomServer(primaryRack, serverSkipSet); 465 466 if (secondaryRS == null || tertiaryRS == null) { 467 LOG.error("Cannot place the secondary, tertiary favored node for region " 468 + regionInfo.getRegionNameAsString()); 469 } 470 // Create the secondary and tertiary pair 471 ServerName[] favoredNodes = new ServerName[2]; 472 favoredNodes[0] = secondaryRS; 473 favoredNodes[1] = tertiaryRS; 474 return favoredNodes; 475 } 476 } 477 478 /** 479 * Place secondary and tertiary nodes in a multi rack case. If there are only two racks, then we 480 * try the place the secondary and tertiary on different rack than primary. But if the other rack 481 * has only one region server, then we place primary and tertiary on one rack and secondary on 482 * another. The aim is two distribute the three favored nodes on >= 2 racks. TODO: see how we can 483 * use generateMissingFavoredNodeMultiRack API here 484 * @param primaryRS The primary favored node. 485 * @param primaryRack The rack of the primary favored node. 486 * @return Array containing secondary and tertiary favored nodes. 487 * @throws IOException Signals that an I/O exception has occurred. 488 */ 489 private ServerName[] multiRackCase(ServerName primaryRS, String primaryRack) throws IOException { 490 491 List<ServerName> favoredNodes = Lists.newArrayList(primaryRS); 492 // Create the secondary and tertiary pair 493 ServerName secondaryRS = generateMissingFavoredNodeMultiRack(favoredNodes); 494 favoredNodes.add(secondaryRS); 495 String secondaryRack = getRackOfServer(secondaryRS); 496 497 ServerName tertiaryRS; 498 if (primaryRack.equals(secondaryRack)) { 499 tertiaryRS = generateMissingFavoredNode(favoredNodes); 500 } else { 501 // Try to place tertiary in secondary RS rack else place on primary rack. 502 tertiaryRS = getOneRandomServer(secondaryRack, Sets.newHashSet(secondaryRS)); 503 if (tertiaryRS == null) { 504 tertiaryRS = getOneRandomServer(primaryRack, Sets.newHashSet(primaryRS)); 505 } 506 // We couldn't find anything in secondary rack, get any FN 507 if (tertiaryRS == null) { 508 tertiaryRS = generateMissingFavoredNode(Lists.newArrayList(primaryRS, secondaryRS)); 509 } 510 } 511 return new ServerName[] { secondaryRS, tertiaryRS }; 512 } 513 514 public boolean canPlaceFavoredNodes() { 515 return (this.servers.size() >= FAVORED_NODES_NUM); 516 } 517 518 private int getTotalNumberOfRacks() { 519 return this.uniqueRackList.size(); 520 } 521 522 private List<ServerName> getServersFromRack(String rack) { 523 return this.rackToRegionServerMap.get(rack); 524 } 525 526 /** 527 * Gets a random server from the specified rack and skips anything specified. 528 * @param rack rack from a server is needed 529 * @param skipServerSet the server shouldn't belong to this set 530 */ 531 protected ServerName getOneRandomServer(String rack, Set<ServerName> skipServerSet) { 532 533 // Is the rack valid? Do we recognize it? 534 if (rack == null || getServersFromRack(rack) == null || getServersFromRack(rack).isEmpty()) { 535 return null; 536 } 537 538 // Lets use a set so we can eliminate duplicates 539 Set<StartcodeAgnosticServerName> serversToChooseFrom = Sets.newHashSet(); 540 for (ServerName sn : getServersFromRack(rack)) { 541 serversToChooseFrom.add(StartcodeAgnosticServerName.valueOf(sn)); 542 } 543 544 if (skipServerSet != null && skipServerSet.size() > 0) { 545 for (ServerName sn : skipServerSet) { 546 serversToChooseFrom.remove(StartcodeAgnosticServerName.valueOf(sn)); 547 } 548 // Do we have any servers left to choose from? 549 if (serversToChooseFrom.isEmpty()) { 550 return null; 551 } 552 } 553 554 ServerName randomServer = null; 555 int randomIndex = ThreadLocalRandom.current().nextInt(serversToChooseFrom.size()); 556 int j = 0; 557 for (StartcodeAgnosticServerName sn : serversToChooseFrom) { 558 if (j == randomIndex) { 559 randomServer = sn; 560 break; 561 } 562 j++; 563 } 564 565 if (randomServer != null) { 566 return ServerName.valueOf(randomServer.getAddress(), randomServer.getStartcode()); 567 } else { 568 return null; 569 } 570 } 571 572 private ServerName getOneRandomServer(String rack) throws IOException { 573 return this.getOneRandomServer(rack, null); 574 } 575 576 String getOneRandomRack(Set<String> skipRackSet) throws IOException { 577 if (skipRackSet == null || uniqueRackList.size() <= skipRackSet.size()) { 578 throw new IOException("Cannot randomly pick another random server"); 579 } 580 581 String randomRack; 582 do { 583 int randomIndex = ThreadLocalRandom.current().nextInt(this.uniqueRackList.size()); 584 randomRack = this.uniqueRackList.get(randomIndex); 585 } while (skipRackSet.contains(randomRack)); 586 587 return randomRack; 588 } 589 590 public static String getFavoredNodesAsString(List<ServerName> nodes) { 591 StringBuilder strBuf = new StringBuilder(); 592 int i = 0; 593 for (ServerName node : nodes) { 594 strBuf.append(node.getAddress()); 595 if (++i != nodes.size()) strBuf.append(";"); 596 } 597 return strBuf.toString(); 598 } 599 600 /* 601 * Generates a missing favored node based on the input favored nodes. This helps to generate new 602 * FN when there is already 2 FN and we need a third one. For eg, while generating new FN for 603 * split daughters after inheriting 2 FN from the parent. If the cluster has only one rack it 604 * generates from the same rack. If the cluster has multiple racks, then it ensures the new FN 605 * respects the rack constraints similar to HDFS. For eg: if there are 3 FN, they will be spread 606 * across 2 racks. 607 */ 608 public ServerName generateMissingFavoredNode(List<ServerName> favoredNodes) throws IOException { 609 if (this.uniqueRackList.size() == 1) { 610 return generateMissingFavoredNodeSingleRack(favoredNodes, null); 611 } else { 612 return generateMissingFavoredNodeMultiRack(favoredNodes, null); 613 } 614 } 615 616 public ServerName generateMissingFavoredNode(List<ServerName> favoredNodes, 617 List<ServerName> excludeNodes) throws IOException { 618 if (this.uniqueRackList.size() == 1) { 619 return generateMissingFavoredNodeSingleRack(favoredNodes, excludeNodes); 620 } else { 621 return generateMissingFavoredNodeMultiRack(favoredNodes, excludeNodes); 622 } 623 } 624 625 /* 626 * Generate FN for a single rack scenario, don't generate from one of the excluded nodes. Helps 627 * when we would like to find a replacement node. 628 */ 629 private ServerName generateMissingFavoredNodeSingleRack(List<ServerName> favoredNodes, 630 List<ServerName> excludeNodes) throws IOException { 631 ServerName newServer = null; 632 Set<ServerName> excludeFNSet = Sets.newHashSet(favoredNodes); 633 if (excludeNodes != null && excludeNodes.size() > 0) { 634 excludeFNSet.addAll(excludeNodes); 635 } 636 if (favoredNodes.size() < FAVORED_NODES_NUM) { 637 newServer = this.getOneRandomServer(this.uniqueRackList.get(0), excludeFNSet); 638 } 639 return newServer; 640 } 641 642 private ServerName generateMissingFavoredNodeMultiRack(List<ServerName> favoredNodes) 643 throws IOException { 644 return generateMissingFavoredNodeMultiRack(favoredNodes, null); 645 } 646 647 /* 648 * Generates a missing FN based on the input favoredNodes and also the nodes to be skipped. Get 649 * the current layout of favored nodes arrangement and nodes to be excluded and get a random node 650 * that goes with HDFS block placement. Eg: If the existing nodes are on one rack, generate one 651 * from another rack. We exclude as much as possible so the random selection has more chance to 652 * generate a node within a few iterations, ideally 1. 653 */ 654 private ServerName generateMissingFavoredNodeMultiRack(List<ServerName> favoredNodes, 655 List<ServerName> excludeNodes) throws IOException { 656 657 Set<String> racks = Sets.newHashSet(); 658 Map<String, Set<ServerName>> rackToFNMapping = new HashMap<>(); 659 660 // Lets understand the current rack distribution of the FN 661 for (ServerName sn : favoredNodes) { 662 String rack = getRackOfServer(sn); 663 racks.add(rack); 664 665 Set<ServerName> serversInRack = rackToFNMapping.get(rack); 666 if (serversInRack == null) { 667 serversInRack = Sets.newHashSet(); 668 rackToFNMapping.put(rack, serversInRack); 669 } 670 serversInRack.add(sn); 671 } 672 673 // What racks should be skipped while getting a FN? 674 Set<String> skipRackSet = Sets.newHashSet(); 675 676 /* 677 * If both the FN are from the same rack, then we don't want to generate another FN on the same 678 * rack. If that rack fails, the region would be unavailable. 679 */ 680 if (racks.size() == 1 && favoredNodes.size() > 1) { 681 skipRackSet.add(racks.iterator().next()); 682 } 683 684 /* 685 * If there are no free nodes on the existing racks, we should skip those racks too. We can 686 * reduce the number of iterations for FN selection. 687 */ 688 for (String rack : racks) { 689 if ( 690 getServersFromRack(rack) != null 691 && rackToFNMapping.get(rack).size() == getServersFromRack(rack).size() 692 ) { 693 skipRackSet.add(rack); 694 } 695 } 696 697 Set<ServerName> favoredNodeSet = Sets.newHashSet(favoredNodes); 698 if (excludeNodes != null && excludeNodes.size() > 0) { 699 favoredNodeSet.addAll(excludeNodes); 700 } 701 702 /* 703 * Lets get a random rack by excluding skipRackSet and generate a random FN from that rack. 704 */ 705 int i = 0; 706 Set<String> randomRacks = Sets.newHashSet(); 707 ServerName newServer = null; 708 do { 709 String randomRack = this.getOneRandomRack(skipRackSet); 710 newServer = this.getOneRandomServer(randomRack, favoredNodeSet); 711 randomRacks.add(randomRack); 712 i++; 713 } while ((i < MAX_ATTEMPTS_FN_GENERATION) && (newServer == null)); 714 715 if (newServer == null) { 716 if (LOG.isTraceEnabled()) { 717 LOG.trace(String.format( 718 "Unable to generate additional favored nodes for %s after " 719 + "considering racks %s and skip rack %s with a unique rack list of %s and rack " 720 + "to RS map of %s and RS to rack map of %s", 721 StringUtils.join(favoredNodes, ","), randomRacks, skipRackSet, uniqueRackList, 722 rackToRegionServerMap, regionServerToRackMap)); 723 } 724 throw new IOException( 725 " Unable to generate additional favored nodes for " + StringUtils.join(favoredNodes, ",")); 726 } 727 return newServer; 728 } 729 730 /* 731 * Generate favored nodes for a region. Choose a random server as primary and then choose 732 * secondary and tertiary FN so its spread across two racks. 733 */ 734 public List<ServerName> generateFavoredNodes(RegionInfo hri) throws IOException { 735 736 List<ServerName> favoredNodesForRegion = new ArrayList<>(FAVORED_NODES_NUM); 737 ServerName primary = servers.get(ThreadLocalRandom.current().nextInt(servers.size())); 738 favoredNodesForRegion.add(ServerName.valueOf(primary.getAddress(), ServerName.NON_STARTCODE)); 739 740 Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>(1); 741 primaryRSMap.put(hri, primary); 742 Map<RegionInfo, ServerName[]> secondaryAndTertiaryRSMap = 743 placeSecondaryAndTertiaryRS(primaryRSMap); 744 ServerName[] secondaryAndTertiaryNodes = secondaryAndTertiaryRSMap.get(hri); 745 if (secondaryAndTertiaryNodes != null && secondaryAndTertiaryNodes.length == 2) { 746 for (ServerName sn : secondaryAndTertiaryNodes) { 747 favoredNodesForRegion.add(ServerName.valueOf(sn.getAddress(), ServerName.NON_STARTCODE)); 748 } 749 return favoredNodesForRegion; 750 } else { 751 throw new HBaseIOException("Unable to generate secondary and tertiary favored nodes."); 752 } 753 } 754 755 public Map<RegionInfo, List<ServerName>> generateFavoredNodesRoundRobin( 756 Map<ServerName, List<RegionInfo>> assignmentMap, List<RegionInfo> regions) throws IOException { 757 758 if (regions.size() > 0) { 759 if (canPlaceFavoredNodes()) { 760 Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>(); 761 // Lets try to have an equal distribution for primary favored node 762 placePrimaryRSAsRoundRobin(assignmentMap, primaryRSMap, regions); 763 return generateFavoredNodes(primaryRSMap); 764 765 } else { 766 throw new HBaseIOException("Not enough nodes to generate favored nodes"); 767 } 768 } 769 return null; 770 } 771 772 /* 773 * Generate favored nodes for a set of regions when we know where they are currently hosted. 774 */ 775 private Map<RegionInfo, List<ServerName>> 776 generateFavoredNodes(Map<RegionInfo, ServerName> primaryRSMap) { 777 778 Map<RegionInfo, List<ServerName>> generatedFavNodes = new HashMap<>(); 779 Map<RegionInfo, ServerName[]> secondaryAndTertiaryRSMap = 780 placeSecondaryAndTertiaryRS(primaryRSMap); 781 782 for (Entry<RegionInfo, ServerName> entry : primaryRSMap.entrySet()) { 783 List<ServerName> favoredNodesForRegion = new ArrayList<>(FAVORED_NODES_NUM); 784 RegionInfo region = entry.getKey(); 785 ServerName primarySN = entry.getValue(); 786 favoredNodesForRegion 787 .add(ServerName.valueOf(primarySN.getHostname(), primarySN.getPort(), NON_STARTCODE)); 788 ServerName[] secondaryAndTertiaryNodes = secondaryAndTertiaryRSMap.get(region); 789 if (secondaryAndTertiaryNodes != null) { 790 favoredNodesForRegion.add(ServerName.valueOf(secondaryAndTertiaryNodes[0].getHostname(), 791 secondaryAndTertiaryNodes[0].getPort(), NON_STARTCODE)); 792 favoredNodesForRegion.add(ServerName.valueOf(secondaryAndTertiaryNodes[1].getHostname(), 793 secondaryAndTertiaryNodes[1].getPort(), NON_STARTCODE)); 794 } 795 generatedFavNodes.put(region, favoredNodesForRegion); 796 } 797 return generatedFavNodes; 798 } 799 800 /** 801 * Get the rack of server from local mapping when present, saves lookup by the RackManager. 802 */ 803 private String getRackOfServer(ServerName sn) { 804 if (this.regionServerToRackMap.containsKey(sn.getHostname())) { 805 return this.regionServerToRackMap.get(sn.getHostname()); 806 } else { 807 String rack = this.rackManager.getRack(sn); 808 this.regionServerToRackMap.put(sn.getHostname(), rack); 809 return rack; 810 } 811 } 812 813 public static int getDataNodePort(Configuration conf) { 814 HdfsConfiguration.init(); 815 Configuration dnConf = new HdfsConfiguration(conf); 816 int dnPort = NetUtils.createSocketAddr(dnConf.get(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, 817 DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort(); 818 LOG.debug("Loaded default datanode port for FN: {}", dnPort); 819 return dnPort; 820 } 821}