View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.balancer;
19  
20  import com.google.common.cache.CacheBuilder;
21  import com.google.common.cache.CacheLoader;
22  import com.google.common.cache.LoadingCache;
23  import com.google.common.collect.Lists;
24  import com.google.common.util.concurrent.ListenableFuture;
25  import com.google.common.util.concurrent.ListeningExecutorService;
26  import com.google.common.util.concurrent.MoreExecutors;
27  import com.google.common.util.concurrent.ThreadFactoryBuilder;
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.ClusterStatus;
32  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
33  import org.apache.hadoop.hbase.HRegionInfo;
34  import org.apache.hadoop.hbase.HTableDescriptor;
35  import org.apache.hadoop.hbase.ServerName;
36  import org.apache.hadoop.hbase.TableName;
37  import org.apache.hadoop.hbase.classification.InterfaceAudience;
38  import org.apache.hadoop.hbase.master.AssignmentManager;
39  import org.apache.hadoop.hbase.master.MasterServices;
40  import org.apache.hadoop.hbase.master.RegionStates;
41  import org.apache.hadoop.hbase.regionserver.HRegion;
42  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
43  
44  import java.io.FileNotFoundException;
45  import java.io.IOException;
46  import java.util.ArrayList;
47  import java.util.Collection;
48  import java.util.HashMap;
49  import java.util.List;
50  import java.util.Set;
51  import java.util.concurrent.Callable;
52  import java.util.concurrent.ExecutionException;
53  import java.util.concurrent.Executors;
54  import java.util.concurrent.TimeUnit;
55  
56  /**
57   * This will find where data for a region is located in HDFS. It ranks
58   * {@link ServerName}'s by the size of the store files they are holding for a
59   * given region.
60   *
61   */
62  @InterfaceAudience.Private
63  class RegionLocationFinder {
64    private static final Log LOG = LogFactory.getLog(RegionLocationFinder.class);
65    private static final long CACHE_TIME = 240 * 60 * 1000;
66    private Configuration conf;
67    private volatile ClusterStatus status;
68    private MasterServices services;
69    private final ListeningExecutorService executor;
70    private long lastFullRefresh = 0;
71  
72    private CacheLoader<HRegionInfo, HDFSBlocksDistribution> loader =
73        new CacheLoader<HRegionInfo, HDFSBlocksDistribution>() {
74  
75          public ListenableFuture<HDFSBlocksDistribution> reload(final HRegionInfo hri,
76                 HDFSBlocksDistribution oldValue) throws Exception {
77            return executor.submit(new Callable<HDFSBlocksDistribution>() {
78              @Override
79              public HDFSBlocksDistribution call() throws Exception {
80                return internalGetTopBlockLocation(hri);
81              }
82            });
83          }
84  
85          @Override
86          public HDFSBlocksDistribution load(HRegionInfo key) throws Exception {
87            return internalGetTopBlockLocation(key);
88          }
89        };
90  
91    // The cache for where regions are located.
92    private LoadingCache<HRegionInfo, HDFSBlocksDistribution> cache = null;
93  
94    RegionLocationFinder() {
95      this.cache = createCache();
96      executor = MoreExecutors.listeningDecorator(
97          Executors.newScheduledThreadPool(
98              5,
99              new ThreadFactoryBuilder().
100                 setDaemon(true)
101                 .setNameFormat("region-location-%d")
102                 .build()));
103   }
104 
105   /**
106    * Create a cache for region to list of servers
107    * @param time time to cache the locations
108    * @return A new Cache.
109    */
110   private LoadingCache<HRegionInfo, HDFSBlocksDistribution> createCache() {
111     return CacheBuilder.newBuilder()
112         .expireAfterWrite(CACHE_TIME, TimeUnit.MILLISECONDS)
113         .build(loader);
114   }
115 
116   public Configuration getConf() {
117     return conf;
118   }
119 
120   public void setConf(Configuration conf) {
121     this.conf = conf;
122   }
123 
124   public void setServices(MasterServices services) {
125     this.services = services;
126   }
127 
128   public void setClusterStatus(ClusterStatus status) {
129     long currentTime = EnvironmentEdgeManager.currentTime();
130     this.status = status;
131     if (currentTime > lastFullRefresh + (CACHE_TIME / 2)) {
132       // Only count the refresh if it includes user tables ( eg more than meta and namespace ).
133       lastFullRefresh = scheduleFullRefresh()?currentTime:lastFullRefresh;
134     }
135 
136   }
137 
138   /**
139    * Refresh all the region locations.
140    *
141    * @return true if user created regions got refreshed.
142    */
143   private boolean scheduleFullRefresh() {
144     // Protect from anything being null while starting up.
145     if (services == null) {
146       return false;
147     }
148     AssignmentManager am = services.getAssignmentManager();
149 
150     if (am == null) {
151       return false;
152     }
153     RegionStates regionStates = am.getRegionStates();
154     if (regionStates == null) {
155       return false;
156     }
157 
158     Set<HRegionInfo> regions = regionStates.getRegionAssignments().keySet();
159     boolean includesUserTables = false;
160     for (final HRegionInfo hri : regions) {
161       cache.refresh(hri);
162       includesUserTables = includesUserTables || !hri.isSystemTable();
163     }
164     return includesUserTables;
165   }
166 
167   protected List<ServerName> getTopBlockLocations(HRegionInfo region) {
168     HDFSBlocksDistribution blocksDistribution = getBlockDistribution(region);
169     List<String> topHosts = blocksDistribution.getTopHosts();
170     return mapHostNameToServerName(topHosts);
171   }
172 
173   /**
174    * Returns an ordered list of hosts which have better locality for this region
175    * than the current host.
176    */
177   protected List<ServerName> getTopBlockLocations(HRegionInfo region, String currentHost) {
178     HDFSBlocksDistribution blocksDistribution = getBlockDistribution(region);
179     List<String> topHosts = new ArrayList<String>();
180     for (String host : blocksDistribution.getTopHosts()) {
181       if (host.equals(currentHost)) {
182         break;
183       }
184       topHosts.add(host);
185     }
186     return mapHostNameToServerName(topHosts);
187   }
188 
189   /**
190    * Returns an ordered list of hosts that are hosting the blocks for this
191    * region. The weight of each host is the sum of the block lengths of all
192    * files on that host, so the first host in the list is the server which holds
193    * the most bytes of the given region's HFiles.
194    *
195    * @param region region
196    * @return ordered list of hosts holding blocks of the specified region
197    */
198   protected HDFSBlocksDistribution internalGetTopBlockLocation(HRegionInfo region) {
199     try {
200       HTableDescriptor tableDescriptor = getTableDescriptor(region.getTable());
201       if (tableDescriptor != null) {
202         HDFSBlocksDistribution blocksDistribution =
203             HRegion.computeHDFSBlocksDistribution(getConf(), tableDescriptor, region);
204         return blocksDistribution;
205       }
206     } catch (IOException ioe) {
207       LOG.warn("IOException during HDFSBlocksDistribution computation. for " + "region = "
208           + region.getEncodedName(), ioe);
209     }
210 
211     return new HDFSBlocksDistribution();
212   }
213 
214   /**
215    * return HTableDescriptor for a given tableName
216    *
217    * @param tableName the table name
218    * @return HTableDescriptor
219    * @throws IOException
220    */
221   protected HTableDescriptor getTableDescriptor(TableName tableName) throws IOException {
222     HTableDescriptor tableDescriptor = null;
223     try {
224       if (this.services != null && this.services.getTableDescriptors() != null) {
225         tableDescriptor = this.services.getTableDescriptors().get(tableName);
226       }
227     } catch (FileNotFoundException fnfe) {
228       LOG.debug("FileNotFoundException during getTableDescriptors." + " Current table name = "
229           + tableName, fnfe);
230     }
231 
232     return tableDescriptor;
233   }
234 
235   /**
236    * Map hostname to ServerName, The output ServerName list will have the same
237    * order as input hosts.
238    *
239    * @param hosts the list of hosts
240    * @return ServerName list
241    */
242   protected List<ServerName> mapHostNameToServerName(List<String> hosts) {
243     if (hosts == null || status == null) {
244       if (hosts == null) {
245         LOG.warn("RegionLocationFinder top hosts is null");
246       }
247       return Lists.newArrayList();
248     }
249 
250     List<ServerName> topServerNames = new ArrayList<ServerName>();
251     Collection<ServerName> regionServers = status.getServers();
252 
253     // create a mapping from hostname to ServerName for fast lookup
254     HashMap<String, List<ServerName>> hostToServerName = new HashMap<String, List<ServerName>>();
255     for (ServerName sn : regionServers) {
256       String host = sn.getHostname();
257       if (!hostToServerName.containsKey(host)) {
258         hostToServerName.put(host, new ArrayList<ServerName>());
259       }
260       hostToServerName.get(host).add(sn);
261     }
262 
263     for (String host : hosts) {
264       if (!hostToServerName.containsKey(host)) {
265         continue;
266       }
267       for (ServerName sn : hostToServerName.get(host)) {
268         // it is possible that HDFS is up ( thus host is valid ),
269         // but RS is down ( thus sn is null )
270         if (sn != null) {
271           topServerNames.add(sn);
272         }
273       }
274     }
275     return topServerNames;
276   }
277 
278   public HDFSBlocksDistribution getBlockDistribution(HRegionInfo hri) {
279     HDFSBlocksDistribution blockDistbn = null;
280     try {
281       if (cache.asMap().containsKey(hri)) {
282         blockDistbn = cache.get(hri);
283         return blockDistbn;
284       } else {
285         LOG.debug("HDFSBlocksDistribution not found in cache for region "
286             + hri.getRegionNameAsString());
287         blockDistbn = internalGetTopBlockLocation(hri);
288         cache.put(hri, blockDistbn);
289         return blockDistbn;
290       }
291     } catch (ExecutionException e) {
292       LOG.warn("Error while fetching cache entry ", e);
293       blockDistbn = internalGetTopBlockLocation(hri);
294       cache.put(hri, blockDistbn);
295       return blockDistbn;
296     }
297   }
298 }