001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.favored;
019
020import static org.apache.hadoop.hbase.ServerName.NON_STARTCODE;
021import static org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper.FAVORED_NODES_NUM;
022import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.PRIMARY;
023import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.SECONDARY;
024import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.TERTIARY;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Collection;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Map;
032import java.util.Set;
033import java.util.stream.Collectors;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.master.MasterServices;
038import org.apache.hadoop.hbase.master.RackManager;
039import org.apache.hadoop.hbase.master.SnapshotOfRegionAssignmentFromMeta;
040import org.apache.hadoop.hdfs.DFSConfigKeys;
041import org.apache.hadoop.hdfs.HdfsConfiguration;
042import org.apache.hadoop.net.NetUtils;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
048import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
049import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
050
051/**
052 * FavoredNodesManager is responsible for maintaining favored nodes info in internal cache and META
053 * table. Its the centralized store for all favored nodes information. All reads and updates should
054 * be done through this class. There should only be one instance of {@link FavoredNodesManager} in
055 * Master. {@link FavoredNodesPlan} and favored node information from
056 * {@link SnapshotOfRegionAssignmentFromMeta} should not be used outside this class (except for
057 * tools that only read or fortest cases). All other classes including Favored balancers and
058 * {@link FavoredNodeAssignmentHelper} should use {@link FavoredNodesManager} for any
059 * read/write/deletes to favored nodes.
060 */
061@InterfaceAudience.Private
062public class FavoredNodesManager {
063  private static final Logger LOG = LoggerFactory.getLogger(FavoredNodesManager.class);
064
065  private final FavoredNodesPlan globalFavoredNodesAssignmentPlan;
066  private final Map<ServerName, List<RegionInfo>> primaryRSToRegionMap;
067  private final Map<ServerName, List<RegionInfo>> secondaryRSToRegionMap;
068  private final Map<ServerName, List<RegionInfo>> teritiaryRSToRegionMap;
069
070  private final MasterServices masterServices;
071  private final RackManager rackManager;
072
073  /**
074   * Datanode port to be used for Favored Nodes.
075   */
076  private int datanodeDataTransferPort;
077
078  public FavoredNodesManager(MasterServices masterServices) {
079    this.masterServices = masterServices;
080    this.globalFavoredNodesAssignmentPlan = new FavoredNodesPlan();
081    this.primaryRSToRegionMap = new HashMap<>();
082    this.secondaryRSToRegionMap = new HashMap<>();
083    this.teritiaryRSToRegionMap = new HashMap<>();
084    this.rackManager = new RackManager(masterServices.getConfiguration());
085  }
086
087  public synchronized void initialize(SnapshotOfRegionAssignmentFromMeta snapshot) {
088    // Add snapshot to structures made on creation. Current structures may have picked
089    // up data between construction and the scan of meta needed before this method
090    // is called. See HBASE-23737 "[Flakey Tests] TestFavoredNodeTableImport fails 30% of the time"
091    this.globalFavoredNodesAssignmentPlan
092      .updateFavoredNodesMap(snapshot.getExistingAssignmentPlan());
093    primaryRSToRegionMap.putAll(snapshot.getPrimaryToRegionInfoMap());
094    secondaryRSToRegionMap.putAll(snapshot.getSecondaryToRegionInfoMap());
095    teritiaryRSToRegionMap.putAll(snapshot.getTertiaryToRegionInfoMap());
096    datanodeDataTransferPort = getDataNodePort();
097  }
098
099  public int getDataNodePort() {
100    HdfsConfiguration.init();
101
102    Configuration dnConf = new HdfsConfiguration(masterServices.getConfiguration());
103
104    int dnPort = NetUtils.createSocketAddr(dnConf.get(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY,
105      DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort();
106    LOG.debug("Loaded default datanode port for FN: " + datanodeDataTransferPort);
107    return dnPort;
108  }
109
110  public synchronized List<ServerName> getFavoredNodes(RegionInfo regionInfo) {
111    return this.globalFavoredNodesAssignmentPlan.getFavoredNodes(regionInfo);
112  }
113
114  /*
115   * Favored nodes are not applicable for system tables. We will use this to check before we apply
116   * any favored nodes logic on a region.
117   */
118  public static boolean isFavoredNodeApplicable(RegionInfo regionInfo) {
119    return !regionInfo.getTable().isSystemTable();
120  }
121
122  /**
123   * Filter and return regions for which favored nodes is not applicable.
124   * @return set of regions for which favored nodes is not applicable
125   */
126  public static Set<RegionInfo> filterNonFNApplicableRegions(Collection<RegionInfo> regions) {
127    return regions.stream().filter(r -> !isFavoredNodeApplicable(r)).collect(Collectors.toSet());
128  }
129
130  /*
131   * This should only be used when sending FN information to the region servers. Instead of sending
132   * the region server port, we use the datanode port. This helps in centralizing the DN port logic
133   * in Master. The RS uses the port from the favored node list as hints.
134   */
135  public synchronized List<ServerName> getFavoredNodesWithDNPort(RegionInfo regionInfo) {
136    if (getFavoredNodes(regionInfo) == null) {
137      return null;
138    }
139
140    List<ServerName> fnWithDNPort = Lists.newArrayList();
141    for (ServerName sn : getFavoredNodes(regionInfo)) {
142      fnWithDNPort
143        .add(ServerName.valueOf(sn.getHostname(), datanodeDataTransferPort, NON_STARTCODE));
144    }
145    return fnWithDNPort;
146  }
147
148  public synchronized void updateFavoredNodes(Map<RegionInfo, List<ServerName>> regionFNMap)
149    throws IOException {
150
151    Map<RegionInfo, List<ServerName>> regionToFavoredNodes = new HashMap<>();
152    for (Map.Entry<RegionInfo, List<ServerName>> entry : regionFNMap.entrySet()) {
153      RegionInfo regionInfo = entry.getKey();
154      List<ServerName> servers = entry.getValue();
155
156      /*
157       * None of the following error conditions should happen. If it does, there is an issue with
158       * favored nodes generation or the regions its called on.
159       */
160      if (servers.size() != Sets.newHashSet(servers).size()) {
161        throw new IOException("Duplicates found: " + servers);
162      }
163
164      if (!isFavoredNodeApplicable(regionInfo)) {
165        throw new IOException("Can't update FN for a un-applicable region: "
166          + regionInfo.getRegionNameAsString() + " with " + servers);
167      }
168
169      if (servers.size() != FAVORED_NODES_NUM) {
170        throw new IOException(
171          "At least " + FAVORED_NODES_NUM + " favored nodes should be present for region : "
172            + regionInfo.getEncodedName() + " current FN servers:" + servers);
173      }
174
175      List<ServerName> serversWithNoStartCodes = Lists.newArrayList();
176      for (ServerName sn : servers) {
177        if (sn.getStartcode() == NON_STARTCODE) {
178          serversWithNoStartCodes.add(sn);
179        } else {
180          serversWithNoStartCodes
181            .add(ServerName.valueOf(sn.getHostname(), sn.getPort(), NON_STARTCODE));
182        }
183      }
184      regionToFavoredNodes.put(regionInfo, serversWithNoStartCodes);
185    }
186
187    // Lets do a bulk update to meta since that reduces the RPC's
188    FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes,
189      masterServices.getConnection());
190    deleteFavoredNodesForRegions(regionToFavoredNodes.keySet());
191
192    for (Map.Entry<RegionInfo, List<ServerName>> entry : regionToFavoredNodes.entrySet()) {
193      RegionInfo regionInfo = entry.getKey();
194      List<ServerName> serversWithNoStartCodes = entry.getValue();
195      globalFavoredNodesAssignmentPlan.updateFavoredNodesMap(regionInfo, serversWithNoStartCodes);
196      addToReplicaLoad(regionInfo, serversWithNoStartCodes);
197    }
198  }
199
200  private synchronized void addToReplicaLoad(RegionInfo hri, List<ServerName> servers) {
201    ServerName serverToUse =
202      ServerName.valueOf(servers.get(PRIMARY.ordinal()).getAddress().toString(), NON_STARTCODE);
203    List<RegionInfo> regionList = primaryRSToRegionMap.get(serverToUse);
204    if (regionList == null) {
205      regionList = new ArrayList<>();
206    }
207    regionList.add(hri);
208    primaryRSToRegionMap.put(serverToUse, regionList);
209
210    serverToUse = ServerName.valueOf(servers.get(SECONDARY.ordinal()).getAddress(), NON_STARTCODE);
211    regionList = secondaryRSToRegionMap.get(serverToUse);
212    if (regionList == null) {
213      regionList = new ArrayList<>();
214    }
215    regionList.add(hri);
216    secondaryRSToRegionMap.put(serverToUse, regionList);
217
218    serverToUse = ServerName.valueOf(servers.get(TERTIARY.ordinal()).getAddress(), NON_STARTCODE);
219    regionList = teritiaryRSToRegionMap.get(serverToUse);
220    if (regionList == null) {
221      regionList = new ArrayList<>();
222    }
223    regionList.add(hri);
224    teritiaryRSToRegionMap.put(serverToUse, regionList);
225  }
226
227  /*
228   * Get the replica count for the servers provided. For each server, replica count includes three
229   * counts for primary, secondary and tertiary. If a server is the primary favored node for 10
230   * regions, secondary for 5 and tertiary for 1, then the list would be [10, 5, 1]. If the server
231   * is newly added to the cluster is not a favored node for any region, the replica count would be
232   * [0, 0, 0].
233   */
234  public synchronized Map<ServerName, List<Integer>> getReplicaLoad(List<ServerName> servers) {
235    Map<ServerName, List<Integer>> result = Maps.newHashMap();
236    for (ServerName sn : servers) {
237      ServerName serverWithNoStartCode = ServerName.valueOf(sn.getAddress(), NON_STARTCODE);
238      List<Integer> countList = Lists.newArrayList();
239      if (primaryRSToRegionMap.containsKey(serverWithNoStartCode)) {
240        countList.add(primaryRSToRegionMap.get(serverWithNoStartCode).size());
241      } else {
242        countList.add(0);
243      }
244      if (secondaryRSToRegionMap.containsKey(serverWithNoStartCode)) {
245        countList.add(secondaryRSToRegionMap.get(serverWithNoStartCode).size());
246      } else {
247        countList.add(0);
248      }
249      if (teritiaryRSToRegionMap.containsKey(serverWithNoStartCode)) {
250        countList.add(teritiaryRSToRegionMap.get(serverWithNoStartCode).size());
251      } else {
252        countList.add(0);
253      }
254      result.put(sn, countList);
255    }
256    return result;
257  }
258
259  public synchronized void deleteFavoredNodesForRegion(RegionInfo regionInfo) {
260    List<ServerName> favNodes = getFavoredNodes(regionInfo);
261    if (favNodes != null) {
262      if (primaryRSToRegionMap.containsKey(favNodes.get(PRIMARY.ordinal()))) {
263        primaryRSToRegionMap.get(favNodes.get(PRIMARY.ordinal())).remove(regionInfo);
264      }
265      if (secondaryRSToRegionMap.containsKey(favNodes.get(SECONDARY.ordinal()))) {
266        secondaryRSToRegionMap.get(favNodes.get(SECONDARY.ordinal())).remove(regionInfo);
267      }
268      if (teritiaryRSToRegionMap.containsKey(favNodes.get(TERTIARY.ordinal()))) {
269        teritiaryRSToRegionMap.get(favNodes.get(TERTIARY.ordinal())).remove(regionInfo);
270      }
271      globalFavoredNodesAssignmentPlan.removeFavoredNodes(regionInfo);
272    }
273  }
274
275  public synchronized void deleteFavoredNodesForRegions(Collection<RegionInfo> regionInfoList) {
276    for (RegionInfo regionInfo : regionInfoList) {
277      deleteFavoredNodesForRegion(regionInfo);
278    }
279  }
280
281  public synchronized Set<RegionInfo> getRegionsOfFavoredNode(ServerName serverName) {
282    Set<RegionInfo> regionInfos = Sets.newHashSet();
283
284    ServerName serverToUse = ServerName.valueOf(serverName.getAddress(), NON_STARTCODE);
285    if (primaryRSToRegionMap.containsKey(serverToUse)) {
286      regionInfos.addAll(primaryRSToRegionMap.get(serverToUse));
287    }
288    if (secondaryRSToRegionMap.containsKey(serverToUse)) {
289      regionInfos.addAll(secondaryRSToRegionMap.get(serverToUse));
290    }
291    if (teritiaryRSToRegionMap.containsKey(serverToUse)) {
292      regionInfos.addAll(teritiaryRSToRegionMap.get(serverToUse));
293    }
294    return regionInfos;
295  }
296
297  public RackManager getRackManager() {
298    return rackManager;
299  }
300}