001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.favored;
020
021import static org.apache.hadoop.hbase.ServerName.NON_STARTCODE;
022import static org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper.FAVORED_NODES_NUM;
023import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.PRIMARY;
024import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.SECONDARY;
025import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.TERTIARY;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.Collection;
030import java.util.HashMap;
031import java.util.List;
032import java.util.Map;
033import java.util.Set;
034
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.HBaseIOException;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.master.MasterServices;
040import org.apache.hadoop.hbase.master.RackManager;
041import org.apache.hadoop.hbase.master.SnapshotOfRegionAssignmentFromMeta;
042import org.apache.hadoop.hdfs.DFSConfigKeys;
043import org.apache.hadoop.hdfs.HdfsConfiguration;
044import org.apache.hadoop.net.NetUtils;
045import org.apache.yetus.audience.InterfaceAudience;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
049import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
050import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
051import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
052
053/**
054 * FavoredNodesManager is responsible for maintaining favored nodes info in internal cache and
055 * META table. Its the centralized store for all favored nodes information. All reads and updates
056 * should be done through this class. There should only be one instance of
057 * {@link FavoredNodesManager} in Master. {@link FavoredNodesPlan} and favored node information
058 * from {@link SnapshotOfRegionAssignmentFromMeta} should not be used outside this class (except
059 * for may be tools that only read or test cases). All other classes including Favored balancers
060 * and {@link FavoredNodeAssignmentHelper} should use {@link FavoredNodesManager} for any
061 * read/write/deletes to favored nodes.
062 */
063@InterfaceAudience.Private
064public class FavoredNodesManager {
065
066  private static final Logger LOG = LoggerFactory.getLogger(FavoredNodesManager.class);
067
068  private FavoredNodesPlan globalFavoredNodesAssignmentPlan;
069  private Map<ServerName, List<RegionInfo>> primaryRSToRegionMap;
070  private Map<ServerName, List<RegionInfo>> secondaryRSToRegionMap;
071  private Map<ServerName, List<RegionInfo>> teritiaryRSToRegionMap;
072
073  private MasterServices masterServices;
074  private RackManager rackManager;
075
076  /**
077   * Datanode port to be used for Favored Nodes.
078   */
079  private int datanodeDataTransferPort;
080
081  public FavoredNodesManager(MasterServices masterServices) {
082    this.masterServices = masterServices;
083    this.globalFavoredNodesAssignmentPlan = new FavoredNodesPlan();
084    this.primaryRSToRegionMap = new HashMap<>();
085    this.secondaryRSToRegionMap = new HashMap<>();
086    this.teritiaryRSToRegionMap = new HashMap<>();
087    this.rackManager = new RackManager(masterServices.getConfiguration());
088  }
089
090  public void initialize(SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment)
091      throws HBaseIOException {
092    globalFavoredNodesAssignmentPlan = snapshotOfRegionAssignment.getExistingAssignmentPlan();
093    primaryRSToRegionMap = snapshotOfRegionAssignment.getPrimaryToRegionInfoMap();
094    secondaryRSToRegionMap = snapshotOfRegionAssignment.getSecondaryToRegionInfoMap();
095    teritiaryRSToRegionMap = snapshotOfRegionAssignment.getTertiaryToRegionInfoMap();
096    datanodeDataTransferPort = getDataNodePort();
097  }
098
099  public int getDataNodePort() {
100    HdfsConfiguration.init();
101
102    Configuration dnConf = new HdfsConfiguration(masterServices.getConfiguration());
103
104    int dnPort = NetUtils.createSocketAddr(
105        dnConf.get(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY,
106            DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort();
107    LOG.debug("Loaded default datanode port for FN: " + datanodeDataTransferPort);
108    return dnPort;
109  }
110
111  public synchronized List<ServerName> getFavoredNodes(RegionInfo regionInfo) {
112    return this.globalFavoredNodesAssignmentPlan.getFavoredNodes(regionInfo);
113  }
114
115  /*
116   * Favored nodes are not applicable for system tables. We will use this to check before
117   * we apply any favored nodes logic on a region.
118   */
119  public static boolean isFavoredNodeApplicable(RegionInfo regionInfo) {
120    return !regionInfo.getTable().isSystemTable();
121  }
122
123  /**
124   * Filter and return regions for which favored nodes is not applicable.
125   *
126   * @param regions - collection of regions
127   * @return set of regions for which favored nodes is not applicable
128   */
129  public static Set<RegionInfo> filterNonFNApplicableRegions(Collection<RegionInfo> regions) {
130    Set<RegionInfo> fnRegions = Sets.newHashSet();
131    for (RegionInfo regionInfo : regions) {
132      if (!isFavoredNodeApplicable(regionInfo)) {
133        fnRegions.add(regionInfo);
134      }
135    }
136    return fnRegions;
137  }
138
139  /*
140   * This should only be used when sending FN information to the region servers. Instead of
141   * sending the region server port, we use the datanode port. This helps in centralizing the DN
142   * port logic in Master. The RS uses the port from the favored node list as hints.
143   */
144  public synchronized List<ServerName> getFavoredNodesWithDNPort(RegionInfo regionInfo) {
145    if (getFavoredNodes(regionInfo) == null) {
146      return null;
147    }
148
149    List<ServerName> fnWithDNPort = Lists.newArrayList();
150    for (ServerName sn : getFavoredNodes(regionInfo)) {
151      fnWithDNPort.add(ServerName.valueOf(sn.getHostname(), datanodeDataTransferPort,
152        NON_STARTCODE));
153    }
154    return fnWithDNPort;
155  }
156
157  public synchronized void updateFavoredNodes(Map<RegionInfo, List<ServerName>> regionFNMap)
158      throws IOException {
159
160    Map<RegionInfo, List<ServerName>> regionToFavoredNodes = new HashMap<>();
161    for (Map.Entry<RegionInfo, List<ServerName>> entry : regionFNMap.entrySet()) {
162      RegionInfo regionInfo = entry.getKey();
163      List<ServerName> servers = entry.getValue();
164
165      /*
166       * None of the following error conditions should happen. If it does, there is an issue with
167       * favored nodes generation or the regions its called on.
168       */
169      if (servers.size() != Sets.newHashSet(servers).size()) {
170        throw new IOException("Duplicates found: " + servers);
171      }
172
173      if (!isFavoredNodeApplicable(regionInfo)) {
174        throw new IOException("Can't update FN for a un-applicable region: "
175            + regionInfo.getRegionNameAsString() + " with " + servers);
176      }
177
178      if (servers.size() != FAVORED_NODES_NUM) {
179        throw new IOException("At least " + FAVORED_NODES_NUM
180            + " favored nodes should be present for region : " + regionInfo.getEncodedName()
181            + " current FN servers:" + servers);
182      }
183
184      List<ServerName> serversWithNoStartCodes = Lists.newArrayList();
185      for (ServerName sn : servers) {
186        if (sn.getStartcode() == NON_STARTCODE) {
187          serversWithNoStartCodes.add(sn);
188        } else {
189          serversWithNoStartCodes.add(ServerName.valueOf(sn.getHostname(), sn.getPort(),
190              NON_STARTCODE));
191        }
192      }
193      regionToFavoredNodes.put(regionInfo, serversWithNoStartCodes);
194    }
195
196    // Lets do a bulk update to meta since that reduces the RPC's
197    FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(
198        regionToFavoredNodes,
199        masterServices.getConnection());
200    deleteFavoredNodesForRegions(regionToFavoredNodes.keySet());
201
202    for (Map.Entry<RegionInfo, List<ServerName>> entry : regionToFavoredNodes.entrySet()) {
203      RegionInfo regionInfo = entry.getKey();
204      List<ServerName> serversWithNoStartCodes = entry.getValue();
205      globalFavoredNodesAssignmentPlan.updateFavoredNodesMap(regionInfo, serversWithNoStartCodes);
206      addToReplicaLoad(regionInfo, serversWithNoStartCodes);
207    }
208  }
209
210  private synchronized void addToReplicaLoad(RegionInfo hri, List<ServerName> servers) {
211    ServerName serverToUse = ServerName.valueOf(servers.get(PRIMARY.ordinal()).getHostAndPort(),
212        NON_STARTCODE);
213    List<RegionInfo> regionList = primaryRSToRegionMap.get(serverToUse);
214    if (regionList == null) {
215      regionList = new ArrayList<>();
216    }
217    regionList.add(hri);
218    primaryRSToRegionMap.put(serverToUse, regionList);
219
220    serverToUse = ServerName
221        .valueOf(servers.get(SECONDARY.ordinal()).getHostAndPort(), NON_STARTCODE);
222    regionList = secondaryRSToRegionMap.get(serverToUse);
223    if (regionList == null) {
224      regionList = new ArrayList<>();
225    }
226    regionList.add(hri);
227    secondaryRSToRegionMap.put(serverToUse, regionList);
228
229    serverToUse = ServerName.valueOf(servers.get(TERTIARY.ordinal()).getHostAndPort(),
230      NON_STARTCODE);
231    regionList = teritiaryRSToRegionMap.get(serverToUse);
232    if (regionList == null) {
233      regionList = new ArrayList<>();
234    }
235    regionList.add(hri);
236    teritiaryRSToRegionMap.put(serverToUse, regionList);
237  }
238
239  /*
240   * Get the replica count for the servers provided.
241   *
242   * For each server, replica count includes three counts for primary, secondary and tertiary.
243   * If a server is the primary favored node for 10 regions, secondary for 5 and tertiary
244   * for 1, then the list would be [10, 5, 1]. If the server is newly added to the cluster is
245   * not a favored node for any region, the replica count would be [0, 0, 0].
246   */
247  public synchronized Map<ServerName, List<Integer>> getReplicaLoad(List<ServerName> servers) {
248    Map<ServerName, List<Integer>> result = Maps.newHashMap();
249    for (ServerName sn : servers) {
250      ServerName serverWithNoStartCode = ServerName.valueOf(sn.getHostAndPort(), NON_STARTCODE);
251      List<Integer> countList = Lists.newArrayList();
252      if (primaryRSToRegionMap.containsKey(serverWithNoStartCode)) {
253        countList.add(primaryRSToRegionMap.get(serverWithNoStartCode).size());
254      } else {
255        countList.add(0);
256      }
257      if (secondaryRSToRegionMap.containsKey(serverWithNoStartCode)) {
258        countList.add(secondaryRSToRegionMap.get(serverWithNoStartCode).size());
259      } else {
260        countList.add(0);
261      }
262      if (teritiaryRSToRegionMap.containsKey(serverWithNoStartCode)) {
263        countList.add(teritiaryRSToRegionMap.get(serverWithNoStartCode).size());
264      } else {
265        countList.add(0);
266      }
267      result.put(sn, countList);
268    }
269    return result;
270  }
271
272  public synchronized void deleteFavoredNodesForRegion(RegionInfo regionInfo) {
273    List<ServerName> favNodes = getFavoredNodes(regionInfo);
274    if (favNodes != null) {
275      if (primaryRSToRegionMap.containsKey(favNodes.get(PRIMARY.ordinal()))) {
276        primaryRSToRegionMap.get(favNodes.get(PRIMARY.ordinal())).remove(regionInfo);
277      }
278      if (secondaryRSToRegionMap.containsKey(favNodes.get(SECONDARY.ordinal()))) {
279        secondaryRSToRegionMap.get(favNodes.get(SECONDARY.ordinal())).remove(regionInfo);
280      }
281      if (teritiaryRSToRegionMap.containsKey(favNodes.get(TERTIARY.ordinal()))) {
282        teritiaryRSToRegionMap.get(favNodes.get(TERTIARY.ordinal())).remove(regionInfo);
283      }
284      globalFavoredNodesAssignmentPlan.removeFavoredNodes(regionInfo);
285    }
286  }
287
288  public synchronized void deleteFavoredNodesForRegions(Collection<RegionInfo> regionInfoList) {
289    for (RegionInfo regionInfo : regionInfoList) {
290      deleteFavoredNodesForRegion(regionInfo);
291    }
292  }
293
294  @VisibleForTesting
295  public synchronized Set<RegionInfo> getRegionsOfFavoredNode(ServerName serverName) {
296    Set<RegionInfo> regionInfos = Sets.newHashSet();
297
298    ServerName serverToUse = ServerName.valueOf(serverName.getHostAndPort(), NON_STARTCODE);
299    if (primaryRSToRegionMap.containsKey(serverToUse)) {
300      regionInfos.addAll(primaryRSToRegionMap.get(serverToUse));
301    }
302    if (secondaryRSToRegionMap.containsKey(serverToUse)) {
303      regionInfos.addAll(secondaryRSToRegionMap.get(serverToUse));
304    }
305    if (teritiaryRSToRegionMap.containsKey(serverToUse)) {
306      regionInfos.addAll(teritiaryRSToRegionMap.get(serverToUse));
307    }
308    return regionInfos;
309  }
310
311  public RackManager getRackManager() {
312    return rackManager;
313  }
314}