001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.favored;
019
020import static org.apache.hadoop.hbase.ServerName.NON_STARTCODE;
021import static org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper.FAVORED_NODES_NUM;
022import static org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper.getDataNodePort;
023import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.PRIMARY;
024import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.SECONDARY;
025import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.TERTIARY;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.Collection;
030import java.util.HashMap;
031import java.util.List;
032import java.util.Map;
033import java.util.Set;
034import java.util.stream.Collectors;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.master.SnapshotOfRegionAssignmentFromMeta;
038import org.apache.hadoop.hbase.master.balancer.ClusterInfoProvider;
039import org.apache.yetus.audience.InterfaceAudience;
040
041import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
042import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
043import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
044
045/**
046 * FavoredNodesManager is responsible for maintaining favored nodes info in internal cache and META
047 * table. Its the centralized store for all favored nodes information. All reads and updates should
048 * be done through this class. There should only be one instance of {@link FavoredNodesManager} in
049 * Master. {@link FavoredNodesPlan} and favored node information from
050 * {@link SnapshotOfRegionAssignmentFromMeta} should not be used outside this class (except for
051 * tools that only read or fortest cases). All other classes including Favored balancers and
052 * {@link FavoredNodeAssignmentHelper} should use {@link FavoredNodesManager} for any
053 * read/write/deletes to favored nodes.
054 */
055@InterfaceAudience.Private
056public class FavoredNodesManager {
057
058  private final FavoredNodesPlan globalFavoredNodesAssignmentPlan;
059  private final Map<ServerName, List<RegionInfo>> primaryRSToRegionMap;
060  private final Map<ServerName, List<RegionInfo>> secondaryRSToRegionMap;
061  private final Map<ServerName, List<RegionInfo>> teritiaryRSToRegionMap;
062
063  private final ClusterInfoProvider provider;
064
065  /**
066   * Datanode port to be used for Favored Nodes.
067   */
068  private int datanodeDataTransferPort;
069
070  public FavoredNodesManager(ClusterInfoProvider provider) {
071    this.provider = provider;
072    this.globalFavoredNodesAssignmentPlan = new FavoredNodesPlan();
073    this.primaryRSToRegionMap = new HashMap<>();
074    this.secondaryRSToRegionMap = new HashMap<>();
075    this.teritiaryRSToRegionMap = new HashMap<>();
076  }
077
078  public void initializeFromMeta() throws IOException {
079    SnapshotOfRegionAssignmentFromMeta snapshot =
080      new SnapshotOfRegionAssignmentFromMeta(provider.getConnection());
081    snapshot.initialize();
082    // Add snapshot to structures made on creation. Current structures may have picked
083    // up data between construction and the scan of meta needed before this method
084    // is called. See HBASE-23737 "[Flakey Tests] TestFavoredNodeTableImport fails 30% of the time"
085    synchronized (this) {
086      this.globalFavoredNodesAssignmentPlan
087        .updateFavoredNodesMap(snapshot.getExistingAssignmentPlan());
088      primaryRSToRegionMap.putAll(snapshot.getPrimaryToRegionInfoMap());
089      secondaryRSToRegionMap.putAll(snapshot.getSecondaryToRegionInfoMap());
090      teritiaryRSToRegionMap.putAll(snapshot.getTertiaryToRegionInfoMap());
091      datanodeDataTransferPort = getDataNodePort(provider.getConfiguration());
092    }
093  }
094
095  public synchronized List<ServerName> getFavoredNodes(RegionInfo regionInfo) {
096    return this.globalFavoredNodesAssignmentPlan.getFavoredNodes(regionInfo);
097  }
098
099  /**
100   * Favored nodes are not applicable for system tables. We will use this to check before we apply
101   * any favored nodes logic on a region.
102   */
103  public static boolean isFavoredNodeApplicable(RegionInfo regionInfo) {
104    return !regionInfo.getTable().isSystemTable();
105  }
106
107  /**
108   * Filter and return regions for which favored nodes is not applicable.
109   * @return set of regions for which favored nodes is not applicable
110   */
111  public static Set<RegionInfo> filterNonFNApplicableRegions(Collection<RegionInfo> regions) {
112    return regions.stream().filter(r -> !isFavoredNodeApplicable(r)).collect(Collectors.toSet());
113  }
114
115  /**
116   * This should only be used when sending FN information to the region servers. Instead of sending
117   * the region server port, we use the datanode port. This helps in centralizing the DN port logic
118   * in Master. The RS uses the port from the favored node list as hints.
119   */
120  public synchronized List<ServerName> getFavoredNodesWithDNPort(RegionInfo regionInfo) {
121    if (getFavoredNodes(regionInfo) == null) {
122      return null;
123    }
124
125    List<ServerName> fnWithDNPort = Lists.newArrayList();
126    for (ServerName sn : getFavoredNodes(regionInfo)) {
127      fnWithDNPort
128        .add(ServerName.valueOf(sn.getHostname(), datanodeDataTransferPort, NON_STARTCODE));
129    }
130    return fnWithDNPort;
131  }
132
133  public synchronized void updateFavoredNodes(Map<RegionInfo, List<ServerName>> regionFNMap)
134    throws IOException {
135    Map<RegionInfo, List<ServerName>> regionToFavoredNodes = new HashMap<>();
136    for (Map.Entry<RegionInfo, List<ServerName>> entry : regionFNMap.entrySet()) {
137      RegionInfo regionInfo = entry.getKey();
138      List<ServerName> servers = entry.getValue();
139
140      /*
141       * None of the following error conditions should happen. If it does, there is an issue with
142       * favored nodes generation or the regions its called on.
143       */
144      if (servers.size() != Sets.newHashSet(servers).size()) {
145        throw new IOException("Duplicates found: " + servers);
146      }
147
148      if (!isFavoredNodeApplicable(regionInfo)) {
149        throw new IOException("Can't update FN for a un-applicable region: "
150          + regionInfo.getRegionNameAsString() + " with " + servers);
151      }
152
153      if (servers.size() != FAVORED_NODES_NUM) {
154        throw new IOException(
155          "At least " + FAVORED_NODES_NUM + " favored nodes should be present for region : "
156            + regionInfo.getEncodedName() + " current FN servers:" + servers);
157      }
158
159      List<ServerName> serversWithNoStartCodes = Lists.newArrayList();
160      for (ServerName sn : servers) {
161        if (sn.getStartcode() == NON_STARTCODE) {
162          serversWithNoStartCodes.add(sn);
163        } else {
164          serversWithNoStartCodes
165            .add(ServerName.valueOf(sn.getHostname(), sn.getPort(), NON_STARTCODE));
166        }
167      }
168      regionToFavoredNodes.put(regionInfo, serversWithNoStartCodes);
169    }
170
171    // Lets do a bulk update to meta since that reduces the RPC's
172    FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes,
173      provider.getConnection());
174    deleteFavoredNodesForRegions(regionToFavoredNodes.keySet());
175
176    for (Map.Entry<RegionInfo, List<ServerName>> entry : regionToFavoredNodes.entrySet()) {
177      RegionInfo regionInfo = entry.getKey();
178      List<ServerName> serversWithNoStartCodes = entry.getValue();
179      globalFavoredNodesAssignmentPlan.updateFavoredNodesMap(regionInfo, serversWithNoStartCodes);
180      addToReplicaLoad(regionInfo, serversWithNoStartCodes);
181    }
182  }
183
184  private synchronized void addToReplicaLoad(RegionInfo hri, List<ServerName> servers) {
185    ServerName serverToUse =
186      ServerName.valueOf(servers.get(PRIMARY.ordinal()).getAddress().toString(), NON_STARTCODE);
187    List<RegionInfo> regionList = primaryRSToRegionMap.get(serverToUse);
188    if (regionList == null) {
189      regionList = new ArrayList<>();
190    }
191    regionList.add(hri);
192    primaryRSToRegionMap.put(serverToUse, regionList);
193
194    serverToUse = ServerName.valueOf(servers.get(SECONDARY.ordinal()).getAddress(), NON_STARTCODE);
195    regionList = secondaryRSToRegionMap.get(serverToUse);
196    if (regionList == null) {
197      regionList = new ArrayList<>();
198    }
199    regionList.add(hri);
200    secondaryRSToRegionMap.put(serverToUse, regionList);
201
202    serverToUse = ServerName.valueOf(servers.get(TERTIARY.ordinal()).getAddress(), NON_STARTCODE);
203    regionList = teritiaryRSToRegionMap.get(serverToUse);
204    if (regionList == null) {
205      regionList = new ArrayList<>();
206    }
207    regionList.add(hri);
208    teritiaryRSToRegionMap.put(serverToUse, regionList);
209  }
210
211  /**
212   * Get the replica count for the servers provided.
213   * <p/>
214   * For each server, replica count includes three counts for primary, secondary and tertiary. If a
215   * server is the primary favored node for 10 regions, secondary for 5 and tertiary for 1, then the
216   * list would be [10, 5, 1]. If the server is newly added to the cluster is not a favored node for
217   * any region, the replica count would be [0, 0, 0].
218   */
219  public synchronized Map<ServerName, List<Integer>> getReplicaLoad(List<ServerName> servers) {
220    Map<ServerName, List<Integer>> result = Maps.newHashMap();
221    for (ServerName sn : servers) {
222      ServerName serverWithNoStartCode = ServerName.valueOf(sn.getAddress(), NON_STARTCODE);
223      List<Integer> countList = Lists.newArrayList();
224      if (primaryRSToRegionMap.containsKey(serverWithNoStartCode)) {
225        countList.add(primaryRSToRegionMap.get(serverWithNoStartCode).size());
226      } else {
227        countList.add(0);
228      }
229      if (secondaryRSToRegionMap.containsKey(serverWithNoStartCode)) {
230        countList.add(secondaryRSToRegionMap.get(serverWithNoStartCode).size());
231      } else {
232        countList.add(0);
233      }
234      if (teritiaryRSToRegionMap.containsKey(serverWithNoStartCode)) {
235        countList.add(teritiaryRSToRegionMap.get(serverWithNoStartCode).size());
236      } else {
237        countList.add(0);
238      }
239      result.put(sn, countList);
240    }
241    return result;
242  }
243
244  public synchronized void deleteFavoredNodesForRegion(RegionInfo regionInfo) {
245    List<ServerName> favNodes = getFavoredNodes(regionInfo);
246    if (favNodes != null) {
247      if (primaryRSToRegionMap.containsKey(favNodes.get(PRIMARY.ordinal()))) {
248        primaryRSToRegionMap.get(favNodes.get(PRIMARY.ordinal())).remove(regionInfo);
249      }
250      if (secondaryRSToRegionMap.containsKey(favNodes.get(SECONDARY.ordinal()))) {
251        secondaryRSToRegionMap.get(favNodes.get(SECONDARY.ordinal())).remove(regionInfo);
252      }
253      if (teritiaryRSToRegionMap.containsKey(favNodes.get(TERTIARY.ordinal()))) {
254        teritiaryRSToRegionMap.get(favNodes.get(TERTIARY.ordinal())).remove(regionInfo);
255      }
256      globalFavoredNodesAssignmentPlan.removeFavoredNodes(regionInfo);
257    }
258  }
259
260  public synchronized void deleteFavoredNodesForRegions(Collection<RegionInfo> regionInfoList) {
261    for (RegionInfo regionInfo : regionInfoList) {
262      deleteFavoredNodesForRegion(regionInfo);
263    }
264  }
265
266  public synchronized Set<RegionInfo> getRegionsOfFavoredNode(ServerName serverName) {
267    Set<RegionInfo> regionInfos = Sets.newHashSet();
268
269    ServerName serverToUse = ServerName.valueOf(serverName.getAddress(), NON_STARTCODE);
270    if (primaryRSToRegionMap.containsKey(serverToUse)) {
271      regionInfos.addAll(primaryRSToRegionMap.get(serverToUse));
272    }
273    if (secondaryRSToRegionMap.containsKey(serverToUse)) {
274      regionInfos.addAll(secondaryRSToRegionMap.get(serverToUse));
275    }
276    if (teritiaryRSToRegionMap.containsKey(serverToUse)) {
277      regionInfos.addAll(teritiaryRSToRegionMap.get(serverToUse));
278    }
279    return regionInfos;
280  }
281}