001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.favored; 020 021import static org.apache.hadoop.hbase.ServerName.NON_STARTCODE; 022import static org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper.FAVORED_NODES_NUM; 023import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.PRIMARY; 024import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.SECONDARY; 025import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.TERTIARY; 026 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.Collection; 030import java.util.HashMap; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.HBaseIOException; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.master.MasterServices; 040import org.apache.hadoop.hbase.master.RackManager; 041import org.apache.hadoop.hbase.master.SnapshotOfRegionAssignmentFromMeta; 042import org.apache.hadoop.hdfs.DFSConfigKeys; 043import org.apache.hadoop.hdfs.HdfsConfiguration; 044import org.apache.hadoop.net.NetUtils; 045import org.apache.yetus.audience.InterfaceAudience; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 049import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 050import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 051import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 052 053/** 054 * FavoredNodesManager is responsible for maintaining favored nodes info in internal cache and 055 * META table. Its the centralized store for all favored nodes information. All reads and updates 056 * should be done through this class. There should only be one instance of 057 * {@link FavoredNodesManager} in Master. {@link FavoredNodesPlan} and favored node information 058 * from {@link SnapshotOfRegionAssignmentFromMeta} should not be used outside this class (except 059 * for may be tools that only read or test cases). All other classes including Favored balancers 060 * and {@link FavoredNodeAssignmentHelper} should use {@link FavoredNodesManager} for any 061 * read/write/deletes to favored nodes. 062 */ 063@InterfaceAudience.Private 064public class FavoredNodesManager { 065 066 private static final Logger LOG = LoggerFactory.getLogger(FavoredNodesManager.class); 067 068 private FavoredNodesPlan globalFavoredNodesAssignmentPlan; 069 private Map<ServerName, List<RegionInfo>> primaryRSToRegionMap; 070 private Map<ServerName, List<RegionInfo>> secondaryRSToRegionMap; 071 private Map<ServerName, List<RegionInfo>> teritiaryRSToRegionMap; 072 073 private MasterServices masterServices; 074 private RackManager rackManager; 075 076 /** 077 * Datanode port to be used for Favored Nodes. 078 */ 079 private int datanodeDataTransferPort; 080 081 public FavoredNodesManager(MasterServices masterServices) { 082 this.masterServices = masterServices; 083 this.globalFavoredNodesAssignmentPlan = new FavoredNodesPlan(); 084 this.primaryRSToRegionMap = new HashMap<>(); 085 this.secondaryRSToRegionMap = new HashMap<>(); 086 this.teritiaryRSToRegionMap = new HashMap<>(); 087 this.rackManager = new RackManager(masterServices.getConfiguration()); 088 } 089 090 public void initialize(SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment) 091 throws HBaseIOException { 092 globalFavoredNodesAssignmentPlan = snapshotOfRegionAssignment.getExistingAssignmentPlan(); 093 primaryRSToRegionMap = snapshotOfRegionAssignment.getPrimaryToRegionInfoMap(); 094 secondaryRSToRegionMap = snapshotOfRegionAssignment.getSecondaryToRegionInfoMap(); 095 teritiaryRSToRegionMap = snapshotOfRegionAssignment.getTertiaryToRegionInfoMap(); 096 datanodeDataTransferPort = getDataNodePort(); 097 } 098 099 public int getDataNodePort() { 100 HdfsConfiguration.init(); 101 102 Configuration dnConf = new HdfsConfiguration(masterServices.getConfiguration()); 103 104 int dnPort = NetUtils.createSocketAddr( 105 dnConf.get(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, 106 DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort(); 107 LOG.debug("Loaded default datanode port for FN: " + datanodeDataTransferPort); 108 return dnPort; 109 } 110 111 public synchronized List<ServerName> getFavoredNodes(RegionInfo regionInfo) { 112 return this.globalFavoredNodesAssignmentPlan.getFavoredNodes(regionInfo); 113 } 114 115 /* 116 * Favored nodes are not applicable for system tables. We will use this to check before 117 * we apply any favored nodes logic on a region. 118 */ 119 public static boolean isFavoredNodeApplicable(RegionInfo regionInfo) { 120 return !regionInfo.getTable().isSystemTable(); 121 } 122 123 /** 124 * Filter and return regions for which favored nodes is not applicable. 125 * 126 * @param regions - collection of regions 127 * @return set of regions for which favored nodes is not applicable 128 */ 129 public static Set<RegionInfo> filterNonFNApplicableRegions(Collection<RegionInfo> regions) { 130 Set<RegionInfo> fnRegions = Sets.newHashSet(); 131 for (RegionInfo regionInfo : regions) { 132 if (!isFavoredNodeApplicable(regionInfo)) { 133 fnRegions.add(regionInfo); 134 } 135 } 136 return fnRegions; 137 } 138 139 /* 140 * This should only be used when sending FN information to the region servers. Instead of 141 * sending the region server port, we use the datanode port. This helps in centralizing the DN 142 * port logic in Master. The RS uses the port from the favored node list as hints. 143 */ 144 public synchronized List<ServerName> getFavoredNodesWithDNPort(RegionInfo regionInfo) { 145 if (getFavoredNodes(regionInfo) == null) { 146 return null; 147 } 148 149 List<ServerName> fnWithDNPort = Lists.newArrayList(); 150 for (ServerName sn : getFavoredNodes(regionInfo)) { 151 fnWithDNPort.add(ServerName.valueOf(sn.getHostname(), datanodeDataTransferPort, 152 NON_STARTCODE)); 153 } 154 return fnWithDNPort; 155 } 156 157 public synchronized void updateFavoredNodes(Map<RegionInfo, List<ServerName>> regionFNMap) 158 throws IOException { 159 160 Map<RegionInfo, List<ServerName>> regionToFavoredNodes = new HashMap<>(); 161 for (Map.Entry<RegionInfo, List<ServerName>> entry : regionFNMap.entrySet()) { 162 RegionInfo regionInfo = entry.getKey(); 163 List<ServerName> servers = entry.getValue(); 164 165 /* 166 * None of the following error conditions should happen. If it does, there is an issue with 167 * favored nodes generation or the regions its called on. 168 */ 169 if (servers.size() != Sets.newHashSet(servers).size()) { 170 throw new IOException("Duplicates found: " + servers); 171 } 172 173 if (!isFavoredNodeApplicable(regionInfo)) { 174 throw new IOException("Can't update FN for a un-applicable region: " 175 + regionInfo.getRegionNameAsString() + " with " + servers); 176 } 177 178 if (servers.size() != FAVORED_NODES_NUM) { 179 throw new IOException("At least " + FAVORED_NODES_NUM 180 + " favored nodes should be present for region : " + regionInfo.getEncodedName() 181 + " current FN servers:" + servers); 182 } 183 184 List<ServerName> serversWithNoStartCodes = Lists.newArrayList(); 185 for (ServerName sn : servers) { 186 if (sn.getStartcode() == NON_STARTCODE) { 187 serversWithNoStartCodes.add(sn); 188 } else { 189 serversWithNoStartCodes.add(ServerName.valueOf(sn.getHostname(), sn.getPort(), 190 NON_STARTCODE)); 191 } 192 } 193 regionToFavoredNodes.put(regionInfo, serversWithNoStartCodes); 194 } 195 196 // Lets do a bulk update to meta since that reduces the RPC's 197 FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo( 198 regionToFavoredNodes, 199 masterServices.getConnection()); 200 deleteFavoredNodesForRegions(regionToFavoredNodes.keySet()); 201 202 for (Map.Entry<RegionInfo, List<ServerName>> entry : regionToFavoredNodes.entrySet()) { 203 RegionInfo regionInfo = entry.getKey(); 204 List<ServerName> serversWithNoStartCodes = entry.getValue(); 205 globalFavoredNodesAssignmentPlan.updateFavoredNodesMap(regionInfo, serversWithNoStartCodes); 206 addToReplicaLoad(regionInfo, serversWithNoStartCodes); 207 } 208 } 209 210 private synchronized void addToReplicaLoad(RegionInfo hri, List<ServerName> servers) { 211 ServerName serverToUse = ServerName.valueOf(servers.get(PRIMARY.ordinal()).getHostAndPort(), 212 NON_STARTCODE); 213 List<RegionInfo> regionList = primaryRSToRegionMap.get(serverToUse); 214 if (regionList == null) { 215 regionList = new ArrayList<>(); 216 } 217 regionList.add(hri); 218 primaryRSToRegionMap.put(serverToUse, regionList); 219 220 serverToUse = ServerName 221 .valueOf(servers.get(SECONDARY.ordinal()).getHostAndPort(), NON_STARTCODE); 222 regionList = secondaryRSToRegionMap.get(serverToUse); 223 if (regionList == null) { 224 regionList = new ArrayList<>(); 225 } 226 regionList.add(hri); 227 secondaryRSToRegionMap.put(serverToUse, regionList); 228 229 serverToUse = ServerName.valueOf(servers.get(TERTIARY.ordinal()).getHostAndPort(), 230 NON_STARTCODE); 231 regionList = teritiaryRSToRegionMap.get(serverToUse); 232 if (regionList == null) { 233 regionList = new ArrayList<>(); 234 } 235 regionList.add(hri); 236 teritiaryRSToRegionMap.put(serverToUse, regionList); 237 } 238 239 /* 240 * Get the replica count for the servers provided. 241 * 242 * For each server, replica count includes three counts for primary, secondary and tertiary. 243 * If a server is the primary favored node for 10 regions, secondary for 5 and tertiary 244 * for 1, then the list would be [10, 5, 1]. If the server is newly added to the cluster is 245 * not a favored node for any region, the replica count would be [0, 0, 0]. 246 */ 247 public synchronized Map<ServerName, List<Integer>> getReplicaLoad(List<ServerName> servers) { 248 Map<ServerName, List<Integer>> result = Maps.newHashMap(); 249 for (ServerName sn : servers) { 250 ServerName serverWithNoStartCode = ServerName.valueOf(sn.getHostAndPort(), NON_STARTCODE); 251 List<Integer> countList = Lists.newArrayList(); 252 if (primaryRSToRegionMap.containsKey(serverWithNoStartCode)) { 253 countList.add(primaryRSToRegionMap.get(serverWithNoStartCode).size()); 254 } else { 255 countList.add(0); 256 } 257 if (secondaryRSToRegionMap.containsKey(serverWithNoStartCode)) { 258 countList.add(secondaryRSToRegionMap.get(serverWithNoStartCode).size()); 259 } else { 260 countList.add(0); 261 } 262 if (teritiaryRSToRegionMap.containsKey(serverWithNoStartCode)) { 263 countList.add(teritiaryRSToRegionMap.get(serverWithNoStartCode).size()); 264 } else { 265 countList.add(0); 266 } 267 result.put(sn, countList); 268 } 269 return result; 270 } 271 272 public synchronized void deleteFavoredNodesForRegion(RegionInfo regionInfo) { 273 List<ServerName> favNodes = getFavoredNodes(regionInfo); 274 if (favNodes != null) { 275 if (primaryRSToRegionMap.containsKey(favNodes.get(PRIMARY.ordinal()))) { 276 primaryRSToRegionMap.get(favNodes.get(PRIMARY.ordinal())).remove(regionInfo); 277 } 278 if (secondaryRSToRegionMap.containsKey(favNodes.get(SECONDARY.ordinal()))) { 279 secondaryRSToRegionMap.get(favNodes.get(SECONDARY.ordinal())).remove(regionInfo); 280 } 281 if (teritiaryRSToRegionMap.containsKey(favNodes.get(TERTIARY.ordinal()))) { 282 teritiaryRSToRegionMap.get(favNodes.get(TERTIARY.ordinal())).remove(regionInfo); 283 } 284 globalFavoredNodesAssignmentPlan.removeFavoredNodes(regionInfo); 285 } 286 } 287 288 public synchronized void deleteFavoredNodesForRegions(Collection<RegionInfo> regionInfoList) { 289 for (RegionInfo regionInfo : regionInfoList) { 290 deleteFavoredNodesForRegion(regionInfo); 291 } 292 } 293 294 @VisibleForTesting 295 public synchronized Set<RegionInfo> getRegionsOfFavoredNode(ServerName serverName) { 296 Set<RegionInfo> regionInfos = Sets.newHashSet(); 297 298 ServerName serverToUse = ServerName.valueOf(serverName.getHostAndPort(), NON_STARTCODE); 299 if (primaryRSToRegionMap.containsKey(serverToUse)) { 300 regionInfos.addAll(primaryRSToRegionMap.get(serverToUse)); 301 } 302 if (secondaryRSToRegionMap.containsKey(serverToUse)) { 303 regionInfos.addAll(secondaryRSToRegionMap.get(serverToUse)); 304 } 305 if (teritiaryRSToRegionMap.containsKey(serverToUse)) { 306 regionInfos.addAll(teritiaryRSToRegionMap.get(serverToUse)); 307 } 308 return regionInfos; 309 } 310 311 public RackManager getRackManager() { 312 return rackManager; 313 } 314}