001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.favored; 019 020import static org.apache.hadoop.hbase.ServerName.NON_STARTCODE; 021import static org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper.FAVORED_NODES_NUM; 022import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.PRIMARY; 023import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.SECONDARY; 024import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.TERTIARY; 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032import java.util.stream.Collectors; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.client.RegionInfo; 036import org.apache.hadoop.hbase.master.MasterServices; 037import org.apache.hadoop.hbase.master.RackManager; 038import org.apache.hadoop.hbase.master.SnapshotOfRegionAssignmentFromMeta; 039import org.apache.hadoop.hdfs.DFSConfigKeys; 040import org.apache.hadoop.hdfs.HdfsConfiguration; 041import org.apache.hadoop.net.NetUtils; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 046import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 047import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 048import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 049 050/** 051 * FavoredNodesManager is responsible for maintaining favored nodes info in internal cache and 052 * META table. Its the centralized store for all favored nodes information. All reads and updates 053 * should be done through this class. There should only be one instance of 054 * {@link FavoredNodesManager} in Master. {@link FavoredNodesPlan} and favored node information 055 * from {@link SnapshotOfRegionAssignmentFromMeta} should not be used outside this class (except 056 * for tools that only read or fortest cases). All other classes including Favored balancers 057 * and {@link FavoredNodeAssignmentHelper} should use {@link FavoredNodesManager} for any 058 * read/write/deletes to favored nodes. 059 */ 060@InterfaceAudience.Private 061public class FavoredNodesManager { 062 private static final Logger LOG = LoggerFactory.getLogger(FavoredNodesManager.class); 063 064 private final FavoredNodesPlan globalFavoredNodesAssignmentPlan; 065 private final Map<ServerName, List<RegionInfo>> primaryRSToRegionMap; 066 private final Map<ServerName, List<RegionInfo>> secondaryRSToRegionMap; 067 private final Map<ServerName, List<RegionInfo>> teritiaryRSToRegionMap; 068 069 private final MasterServices masterServices; 070 private final RackManager rackManager; 071 072 /** 073 * Datanode port to be used for Favored Nodes. 074 */ 075 private int datanodeDataTransferPort; 076 077 public FavoredNodesManager(MasterServices masterServices) { 078 this.masterServices = masterServices; 079 this.globalFavoredNodesAssignmentPlan = new FavoredNodesPlan(); 080 this.primaryRSToRegionMap = new HashMap<>(); 081 this.secondaryRSToRegionMap = new HashMap<>(); 082 this.teritiaryRSToRegionMap = new HashMap<>(); 083 this.rackManager = new RackManager(masterServices.getConfiguration()); 084 } 085 086 public synchronized void initialize(SnapshotOfRegionAssignmentFromMeta snapshot) { 087 // Add snapshot to structures made on creation. Current structures may have picked 088 // up data between construction and the scan of meta needed before this method 089 // is called. See HBASE-23737 "[Flakey Tests] TestFavoredNodeTableImport fails 30% of the time" 090 this.globalFavoredNodesAssignmentPlan. 091 updateFavoredNodesMap(snapshot.getExistingAssignmentPlan()); 092 primaryRSToRegionMap.putAll(snapshot.getPrimaryToRegionInfoMap()); 093 secondaryRSToRegionMap.putAll(snapshot.getSecondaryToRegionInfoMap()); 094 teritiaryRSToRegionMap.putAll(snapshot.getTertiaryToRegionInfoMap()); 095 datanodeDataTransferPort= getDataNodePort(); 096 } 097 098 @VisibleForTesting 099 public int getDataNodePort() { 100 HdfsConfiguration.init(); 101 102 Configuration dnConf = new HdfsConfiguration(masterServices.getConfiguration()); 103 104 int dnPort = NetUtils.createSocketAddr( 105 dnConf.get(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, 106 DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort(); 107 LOG.debug("Loaded default datanode port for FN: " + datanodeDataTransferPort); 108 return dnPort; 109 } 110 111 public synchronized List<ServerName> getFavoredNodes(RegionInfo regionInfo) { 112 return this.globalFavoredNodesAssignmentPlan.getFavoredNodes(regionInfo); 113 } 114 115 /* 116 * Favored nodes are not applicable for system tables. We will use this to check before 117 * we apply any favored nodes logic on a region. 118 */ 119 public static boolean isFavoredNodeApplicable(RegionInfo regionInfo) { 120 return !regionInfo.getTable().isSystemTable(); 121 } 122 123 /** 124 * Filter and return regions for which favored nodes is not applicable. 125 * @return set of regions for which favored nodes is not applicable 126 */ 127 public static Set<RegionInfo> filterNonFNApplicableRegions(Collection<RegionInfo> regions) { 128 return regions.stream().filter(r -> !isFavoredNodeApplicable(r)).collect(Collectors.toSet()); 129 } 130 131 /* 132 * This should only be used when sending FN information to the region servers. Instead of 133 * sending the region server port, we use the datanode port. This helps in centralizing the DN 134 * port logic in Master. The RS uses the port from the favored node list as hints. 135 */ 136 public synchronized List<ServerName> getFavoredNodesWithDNPort(RegionInfo regionInfo) { 137 if (getFavoredNodes(regionInfo) == null) { 138 return null; 139 } 140 141 List<ServerName> fnWithDNPort = Lists.newArrayList(); 142 for (ServerName sn : getFavoredNodes(regionInfo)) { 143 fnWithDNPort.add(ServerName.valueOf(sn.getHostname(), datanodeDataTransferPort, 144 NON_STARTCODE)); 145 } 146 return fnWithDNPort; 147 } 148 149 public synchronized void updateFavoredNodes(Map<RegionInfo, List<ServerName>> regionFNMap) 150 throws IOException { 151 152 Map<RegionInfo, List<ServerName>> regionToFavoredNodes = new HashMap<>(); 153 for (Map.Entry<RegionInfo, List<ServerName>> entry : regionFNMap.entrySet()) { 154 RegionInfo regionInfo = entry.getKey(); 155 List<ServerName> servers = entry.getValue(); 156 157 /* 158 * None of the following error conditions should happen. If it does, there is an issue with 159 * favored nodes generation or the regions its called on. 160 */ 161 if (servers.size() != Sets.newHashSet(servers).size()) { 162 throw new IOException("Duplicates found: " + servers); 163 } 164 165 if (!isFavoredNodeApplicable(regionInfo)) { 166 throw new IOException("Can't update FN for a un-applicable region: " 167 + regionInfo.getRegionNameAsString() + " with " + servers); 168 } 169 170 if (servers.size() != FAVORED_NODES_NUM) { 171 throw new IOException("At least " + FAVORED_NODES_NUM 172 + " favored nodes should be present for region : " + regionInfo.getEncodedName() 173 + " current FN servers:" + servers); 174 } 175 176 List<ServerName> serversWithNoStartCodes = Lists.newArrayList(); 177 for (ServerName sn : servers) { 178 if (sn.getStartcode() == NON_STARTCODE) { 179 serversWithNoStartCodes.add(sn); 180 } else { 181 serversWithNoStartCodes.add(ServerName.valueOf(sn.getHostname(), sn.getPort(), 182 NON_STARTCODE)); 183 } 184 } 185 regionToFavoredNodes.put(regionInfo, serversWithNoStartCodes); 186 } 187 188 // Lets do a bulk update to meta since that reduces the RPC's 189 FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes, 190 masterServices.getConnection()); 191 deleteFavoredNodesForRegions(regionToFavoredNodes.keySet()); 192 193 for (Map.Entry<RegionInfo, List<ServerName>> entry : regionToFavoredNodes.entrySet()) { 194 RegionInfo regionInfo = entry.getKey(); 195 List<ServerName> serversWithNoStartCodes = entry.getValue(); 196 globalFavoredNodesAssignmentPlan.updateFavoredNodesMap(regionInfo, serversWithNoStartCodes); 197 addToReplicaLoad(regionInfo, serversWithNoStartCodes); 198 } 199 } 200 201 private synchronized void addToReplicaLoad(RegionInfo hri, List<ServerName> servers) { 202 ServerName serverToUse = 203 ServerName.valueOf(servers.get(PRIMARY.ordinal()).getAddress().toString(), NON_STARTCODE); 204 List<RegionInfo> regionList = primaryRSToRegionMap.get(serverToUse); 205 if (regionList == null) { 206 regionList = new ArrayList<>(); 207 } 208 regionList.add(hri); 209 primaryRSToRegionMap.put(serverToUse, regionList); 210 211 serverToUse = ServerName 212 .valueOf(servers.get(SECONDARY.ordinal()).getHostAndPort(), NON_STARTCODE); 213 regionList = secondaryRSToRegionMap.get(serverToUse); 214 if (regionList == null) { 215 regionList = new ArrayList<>(); 216 } 217 regionList.add(hri); 218 secondaryRSToRegionMap.put(serverToUse, regionList); 219 220 serverToUse = ServerName.valueOf(servers.get(TERTIARY.ordinal()).getHostAndPort(), 221 NON_STARTCODE); 222 regionList = teritiaryRSToRegionMap.get(serverToUse); 223 if (regionList == null) { 224 regionList = new ArrayList<>(); 225 } 226 regionList.add(hri); 227 teritiaryRSToRegionMap.put(serverToUse, regionList); 228 } 229 230 /* 231 * Get the replica count for the servers provided. 232 * 233 * For each server, replica count includes three counts for primary, secondary and tertiary. 234 * If a server is the primary favored node for 10 regions, secondary for 5 and tertiary 235 * for 1, then the list would be [10, 5, 1]. If the server is newly added to the cluster is 236 * not a favored node for any region, the replica count would be [0, 0, 0]. 237 */ 238 public synchronized Map<ServerName, List<Integer>> getReplicaLoad(List<ServerName> servers) { 239 Map<ServerName, List<Integer>> result = Maps.newHashMap(); 240 for (ServerName sn : servers) { 241 ServerName serverWithNoStartCode = ServerName.valueOf(sn.getHostAndPort(), NON_STARTCODE); 242 List<Integer> countList = Lists.newArrayList(); 243 if (primaryRSToRegionMap.containsKey(serverWithNoStartCode)) { 244 countList.add(primaryRSToRegionMap.get(serverWithNoStartCode).size()); 245 } else { 246 countList.add(0); 247 } 248 if (secondaryRSToRegionMap.containsKey(serverWithNoStartCode)) { 249 countList.add(secondaryRSToRegionMap.get(serverWithNoStartCode).size()); 250 } else { 251 countList.add(0); 252 } 253 if (teritiaryRSToRegionMap.containsKey(serverWithNoStartCode)) { 254 countList.add(teritiaryRSToRegionMap.get(serverWithNoStartCode).size()); 255 } else { 256 countList.add(0); 257 } 258 result.put(sn, countList); 259 } 260 return result; 261 } 262 263 public synchronized void deleteFavoredNodesForRegion(RegionInfo regionInfo) { 264 List<ServerName> favNodes = getFavoredNodes(regionInfo); 265 if (favNodes != null) { 266 if (primaryRSToRegionMap.containsKey(favNodes.get(PRIMARY.ordinal()))) { 267 primaryRSToRegionMap.get(favNodes.get(PRIMARY.ordinal())).remove(regionInfo); 268 } 269 if (secondaryRSToRegionMap.containsKey(favNodes.get(SECONDARY.ordinal()))) { 270 secondaryRSToRegionMap.get(favNodes.get(SECONDARY.ordinal())).remove(regionInfo); 271 } 272 if (teritiaryRSToRegionMap.containsKey(favNodes.get(TERTIARY.ordinal()))) { 273 teritiaryRSToRegionMap.get(favNodes.get(TERTIARY.ordinal())).remove(regionInfo); 274 } 275 globalFavoredNodesAssignmentPlan.removeFavoredNodes(regionInfo); 276 } 277 } 278 279 public synchronized void deleteFavoredNodesForRegions(Collection<RegionInfo> regionInfoList) { 280 for (RegionInfo regionInfo : regionInfoList) { 281 deleteFavoredNodesForRegion(regionInfo); 282 } 283 } 284 285 @VisibleForTesting 286 public synchronized Set<RegionInfo> getRegionsOfFavoredNode(ServerName serverName) { 287 Set<RegionInfo> regionInfos = Sets.newHashSet(); 288 289 ServerName serverToUse = ServerName.valueOf(serverName.getHostAndPort(), NON_STARTCODE); 290 if (primaryRSToRegionMap.containsKey(serverToUse)) { 291 regionInfos.addAll(primaryRSToRegionMap.get(serverToUse)); 292 } 293 if (secondaryRSToRegionMap.containsKey(serverToUse)) { 294 regionInfos.addAll(secondaryRSToRegionMap.get(serverToUse)); 295 } 296 if (teritiaryRSToRegionMap.containsKey(serverToUse)) { 297 regionInfos.addAll(teritiaryRSToRegionMap.get(serverToUse)); 298 } 299 return regionInfos; 300 } 301 302 public RackManager getRackManager() { 303 return rackManager; 304 } 305}