001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.favored;
019
020import static org.apache.hadoop.hbase.ServerName.NON_STARTCODE;
021import static org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper.FAVORED_NODES_NUM;
022import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.PRIMARY;
023import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.SECONDARY;
024import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.TERTIARY;
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032import java.util.stream.Collectors;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.client.RegionInfo;
036import org.apache.hadoop.hbase.master.MasterServices;
037import org.apache.hadoop.hbase.master.RackManager;
038import org.apache.hadoop.hbase.master.SnapshotOfRegionAssignmentFromMeta;
039import org.apache.hadoop.hdfs.DFSConfigKeys;
040import org.apache.hadoop.hdfs.HdfsConfiguration;
041import org.apache.hadoop.net.NetUtils;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
046import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
047import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
048import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
049
050/**
051 * FavoredNodesManager is responsible for maintaining favored nodes info in internal cache and
052 * META table. Its the centralized store for all favored nodes information. All reads and updates
053 * should be done through this class. There should only be one instance of
054 * {@link FavoredNodesManager} in Master. {@link FavoredNodesPlan} and favored node information
055 * from {@link SnapshotOfRegionAssignmentFromMeta} should not be used outside this class (except
056 * for tools that only read or fortest cases). All other classes including Favored balancers
057 * and {@link FavoredNodeAssignmentHelper} should use {@link FavoredNodesManager} for any
058 * read/write/deletes to favored nodes.
059 */
060@InterfaceAudience.Private
061public class FavoredNodesManager {
062  private static final Logger LOG = LoggerFactory.getLogger(FavoredNodesManager.class);
063
064  private final FavoredNodesPlan globalFavoredNodesAssignmentPlan;
065  private final Map<ServerName, List<RegionInfo>> primaryRSToRegionMap;
066  private final Map<ServerName, List<RegionInfo>> secondaryRSToRegionMap;
067  private final Map<ServerName, List<RegionInfo>> teritiaryRSToRegionMap;
068
069  private final MasterServices masterServices;
070  private final RackManager rackManager;
071
072  /**
073   * Datanode port to be used for Favored Nodes.
074   */
075  private int datanodeDataTransferPort;
076
077  public FavoredNodesManager(MasterServices masterServices) {
078    this.masterServices = masterServices;
079    this.globalFavoredNodesAssignmentPlan = new FavoredNodesPlan();
080    this.primaryRSToRegionMap = new HashMap<>();
081    this.secondaryRSToRegionMap = new HashMap<>();
082    this.teritiaryRSToRegionMap = new HashMap<>();
083    this.rackManager = new RackManager(masterServices.getConfiguration());
084  }
085
086  public synchronized void initialize(SnapshotOfRegionAssignmentFromMeta snapshot) {
087    // Add snapshot to structures made on creation. Current structures may have picked
088    // up data between construction and the scan of meta needed before this method
089    // is called.  See HBASE-23737 "[Flakey Tests] TestFavoredNodeTableImport fails 30% of the time"
090    this.globalFavoredNodesAssignmentPlan.
091      updateFavoredNodesMap(snapshot.getExistingAssignmentPlan());
092    primaryRSToRegionMap.putAll(snapshot.getPrimaryToRegionInfoMap());
093    secondaryRSToRegionMap.putAll(snapshot.getSecondaryToRegionInfoMap());
094    teritiaryRSToRegionMap.putAll(snapshot.getTertiaryToRegionInfoMap());
095    datanodeDataTransferPort= getDataNodePort();
096  }
097
098  @VisibleForTesting
099  public int getDataNodePort() {
100    HdfsConfiguration.init();
101
102    Configuration dnConf = new HdfsConfiguration(masterServices.getConfiguration());
103
104    int dnPort = NetUtils.createSocketAddr(
105        dnConf.get(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY,
106            DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort();
107    LOG.debug("Loaded default datanode port for FN: " + datanodeDataTransferPort);
108    return dnPort;
109  }
110
111  public synchronized List<ServerName> getFavoredNodes(RegionInfo regionInfo) {
112    return this.globalFavoredNodesAssignmentPlan.getFavoredNodes(regionInfo);
113  }
114
115  /*
116   * Favored nodes are not applicable for system tables. We will use this to check before
117   * we apply any favored nodes logic on a region.
118   */
119  public static boolean isFavoredNodeApplicable(RegionInfo regionInfo) {
120    return !regionInfo.getTable().isSystemTable();
121  }
122
123  /**
124   * Filter and return regions for which favored nodes is not applicable.
125   * @return set of regions for which favored nodes is not applicable
126   */
127  public static Set<RegionInfo> filterNonFNApplicableRegions(Collection<RegionInfo> regions) {
128    return regions.stream().filter(r -> !isFavoredNodeApplicable(r)).collect(Collectors.toSet());
129  }
130
131  /*
132   * This should only be used when sending FN information to the region servers. Instead of
133   * sending the region server port, we use the datanode port. This helps in centralizing the DN
134   * port logic in Master. The RS uses the port from the favored node list as hints.
135   */
136  public synchronized List<ServerName> getFavoredNodesWithDNPort(RegionInfo regionInfo) {
137    if (getFavoredNodes(regionInfo) == null) {
138      return null;
139    }
140
141    List<ServerName> fnWithDNPort = Lists.newArrayList();
142    for (ServerName sn : getFavoredNodes(regionInfo)) {
143      fnWithDNPort.add(ServerName.valueOf(sn.getHostname(), datanodeDataTransferPort,
144        NON_STARTCODE));
145    }
146    return fnWithDNPort;
147  }
148
149  public synchronized void updateFavoredNodes(Map<RegionInfo, List<ServerName>> regionFNMap)
150      throws IOException {
151
152    Map<RegionInfo, List<ServerName>> regionToFavoredNodes = new HashMap<>();
153    for (Map.Entry<RegionInfo, List<ServerName>> entry : regionFNMap.entrySet()) {
154      RegionInfo regionInfo = entry.getKey();
155      List<ServerName> servers = entry.getValue();
156
157      /*
158       * None of the following error conditions should happen. If it does, there is an issue with
159       * favored nodes generation or the regions its called on.
160       */
161      if (servers.size() != Sets.newHashSet(servers).size()) {
162        throw new IOException("Duplicates found: " + servers);
163      }
164
165      if (!isFavoredNodeApplicable(regionInfo)) {
166        throw new IOException("Can't update FN for a un-applicable region: "
167            + regionInfo.getRegionNameAsString() + " with " + servers);
168      }
169
170      if (servers.size() != FAVORED_NODES_NUM) {
171        throw new IOException("At least " + FAVORED_NODES_NUM
172            + " favored nodes should be present for region : " + regionInfo.getEncodedName()
173            + " current FN servers:" + servers);
174      }
175
176      List<ServerName> serversWithNoStartCodes = Lists.newArrayList();
177      for (ServerName sn : servers) {
178        if (sn.getStartcode() == NON_STARTCODE) {
179          serversWithNoStartCodes.add(sn);
180        } else {
181          serversWithNoStartCodes.add(ServerName.valueOf(sn.getHostname(), sn.getPort(),
182              NON_STARTCODE));
183        }
184      }
185      regionToFavoredNodes.put(regionInfo, serversWithNoStartCodes);
186    }
187
188    // Lets do a bulk update to meta since that reduces the RPC's
189    FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes,
190        masterServices.getConnection());
191    deleteFavoredNodesForRegions(regionToFavoredNodes.keySet());
192
193    for (Map.Entry<RegionInfo, List<ServerName>> entry : regionToFavoredNodes.entrySet()) {
194      RegionInfo regionInfo = entry.getKey();
195      List<ServerName> serversWithNoStartCodes = entry.getValue();
196      globalFavoredNodesAssignmentPlan.updateFavoredNodesMap(regionInfo, serversWithNoStartCodes);
197      addToReplicaLoad(regionInfo, serversWithNoStartCodes);
198    }
199  }
200
201  private synchronized void addToReplicaLoad(RegionInfo hri, List<ServerName> servers) {
202    ServerName serverToUse =
203      ServerName.valueOf(servers.get(PRIMARY.ordinal()).getAddress().toString(), NON_STARTCODE);
204    List<RegionInfo> regionList = primaryRSToRegionMap.get(serverToUse);
205    if (regionList == null) {
206      regionList = new ArrayList<>();
207    }
208    regionList.add(hri);
209    primaryRSToRegionMap.put(serverToUse, regionList);
210
211    serverToUse = ServerName
212        .valueOf(servers.get(SECONDARY.ordinal()).getHostAndPort(), NON_STARTCODE);
213    regionList = secondaryRSToRegionMap.get(serverToUse);
214    if (regionList == null) {
215      regionList = new ArrayList<>();
216    }
217    regionList.add(hri);
218    secondaryRSToRegionMap.put(serverToUse, regionList);
219
220    serverToUse = ServerName.valueOf(servers.get(TERTIARY.ordinal()).getHostAndPort(),
221      NON_STARTCODE);
222    regionList = teritiaryRSToRegionMap.get(serverToUse);
223    if (regionList == null) {
224      regionList = new ArrayList<>();
225    }
226    regionList.add(hri);
227    teritiaryRSToRegionMap.put(serverToUse, regionList);
228  }
229
230  /*
231   * Get the replica count for the servers provided.
232   *
233   * For each server, replica count includes three counts for primary, secondary and tertiary.
234   * If a server is the primary favored node for 10 regions, secondary for 5 and tertiary
235   * for 1, then the list would be [10, 5, 1]. If the server is newly added to the cluster is
236   * not a favored node for any region, the replica count would be [0, 0, 0].
237   */
238  public synchronized Map<ServerName, List<Integer>> getReplicaLoad(List<ServerName> servers) {
239    Map<ServerName, List<Integer>> result = Maps.newHashMap();
240    for (ServerName sn : servers) {
241      ServerName serverWithNoStartCode = ServerName.valueOf(sn.getHostAndPort(), NON_STARTCODE);
242      List<Integer> countList = Lists.newArrayList();
243      if (primaryRSToRegionMap.containsKey(serverWithNoStartCode)) {
244        countList.add(primaryRSToRegionMap.get(serverWithNoStartCode).size());
245      } else {
246        countList.add(0);
247      }
248      if (secondaryRSToRegionMap.containsKey(serverWithNoStartCode)) {
249        countList.add(secondaryRSToRegionMap.get(serverWithNoStartCode).size());
250      } else {
251        countList.add(0);
252      }
253      if (teritiaryRSToRegionMap.containsKey(serverWithNoStartCode)) {
254        countList.add(teritiaryRSToRegionMap.get(serverWithNoStartCode).size());
255      } else {
256        countList.add(0);
257      }
258      result.put(sn, countList);
259    }
260    return result;
261  }
262
263  public synchronized void deleteFavoredNodesForRegion(RegionInfo regionInfo) {
264    List<ServerName> favNodes = getFavoredNodes(regionInfo);
265    if (favNodes != null) {
266      if (primaryRSToRegionMap.containsKey(favNodes.get(PRIMARY.ordinal()))) {
267        primaryRSToRegionMap.get(favNodes.get(PRIMARY.ordinal())).remove(regionInfo);
268      }
269      if (secondaryRSToRegionMap.containsKey(favNodes.get(SECONDARY.ordinal()))) {
270        secondaryRSToRegionMap.get(favNodes.get(SECONDARY.ordinal())).remove(regionInfo);
271      }
272      if (teritiaryRSToRegionMap.containsKey(favNodes.get(TERTIARY.ordinal()))) {
273        teritiaryRSToRegionMap.get(favNodes.get(TERTIARY.ordinal())).remove(regionInfo);
274      }
275      globalFavoredNodesAssignmentPlan.removeFavoredNodes(regionInfo);
276    }
277  }
278
279  public synchronized void deleteFavoredNodesForRegions(Collection<RegionInfo> regionInfoList) {
280    for (RegionInfo regionInfo : regionInfoList) {
281      deleteFavoredNodesForRegion(regionInfo);
282    }
283  }
284
285  @VisibleForTesting
286  public synchronized Set<RegionInfo> getRegionsOfFavoredNode(ServerName serverName) {
287    Set<RegionInfo> regionInfos = Sets.newHashSet();
288
289    ServerName serverToUse = ServerName.valueOf(serverName.getHostAndPort(), NON_STARTCODE);
290    if (primaryRSToRegionMap.containsKey(serverToUse)) {
291      regionInfos.addAll(primaryRSToRegionMap.get(serverToUse));
292    }
293    if (secondaryRSToRegionMap.containsKey(serverToUse)) {
294      regionInfos.addAll(secondaryRSToRegionMap.get(serverToUse));
295    }
296    if (teritiaryRSToRegionMap.containsKey(serverToUse)) {
297      regionInfos.addAll(teritiaryRSToRegionMap.get(serverToUse));
298    }
299    return regionInfos;
300  }
301
302  public RackManager getRackManager() {
303    return rackManager;
304  }
305}