View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.balancer;
19  
20  import com.google.common.cache.CacheBuilder;
21  import com.google.common.cache.CacheLoader;
22  import com.google.common.cache.LoadingCache;
23  import com.google.common.collect.Lists;
24  import com.google.common.util.concurrent.ListenableFuture;
25  import com.google.common.util.concurrent.ListeningExecutorService;
26  import com.google.common.util.concurrent.MoreExecutors;
27  import com.google.common.util.concurrent.ThreadFactoryBuilder;
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.ClusterStatus;
32  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
33  import org.apache.hadoop.hbase.HRegionInfo;
34  import org.apache.hadoop.hbase.HTableDescriptor;
35  import org.apache.hadoop.hbase.ServerName;
36  import org.apache.hadoop.hbase.TableName;
37  import org.apache.hadoop.hbase.classification.InterfaceAudience;
38  import org.apache.hadoop.hbase.master.AssignmentManager;
39  import org.apache.hadoop.hbase.master.MasterServices;
40  import org.apache.hadoop.hbase.master.RegionStates;
41  import org.apache.hadoop.hbase.regionserver.HRegion;
42  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
43  
44  import java.io.FileNotFoundException;
45  import java.io.IOException;
46  import java.util.ArrayList;
47  import java.util.Collection;
48  import java.util.HashMap;
49  import java.util.List;
50  import java.util.Set;
51  import java.util.concurrent.Callable;
52  import java.util.concurrent.ExecutionException;
53  import java.util.concurrent.Executors;
54  import java.util.concurrent.TimeUnit;
55  
56  /**
57   * This will find where data for a region is located in HDFS. It ranks
58   * {@link ServerName}'s by the size of the store files they are holding for a
59   * given region.
60   *
61   */
62  @InterfaceAudience.Private
63  class RegionLocationFinder {
64    private static final Log LOG = LogFactory.getLog(RegionLocationFinder.class);
65    private static final long CACHE_TIME = 240 * 60 * 1000;
66    private Configuration conf;
67    private volatile ClusterStatus status;
68    private MasterServices services;
69    private final ListeningExecutorService executor;
70    private long lastFullRefresh = 0;
71  
72    private CacheLoader<HRegionInfo, HDFSBlocksDistribution> loader =
73        new CacheLoader<HRegionInfo, HDFSBlocksDistribution>() {
74  
75          public ListenableFuture<HDFSBlocksDistribution> reload(final HRegionInfo hri,
76                 HDFSBlocksDistribution oldValue) throws Exception {
77            return executor.submit(new Callable<HDFSBlocksDistribution>() {
78              @Override
79              public HDFSBlocksDistribution call() throws Exception {
80                return internalGetTopBlockLocation(hri);
81              }
82            });
83          }
84  
85          @Override
86          public HDFSBlocksDistribution load(HRegionInfo key) throws Exception {
87            return internalGetTopBlockLocation(key);
88          }
89        };
90  
91    // The cache for where regions are located.
92    private LoadingCache<HRegionInfo, HDFSBlocksDistribution> cache = null;
93  
94    RegionLocationFinder() {
95      this.cache = createCache();
96      executor = MoreExecutors.listeningDecorator(
97          Executors.newScheduledThreadPool(
98              5,
99              new ThreadFactoryBuilder().
100                 setDaemon(true)
101                 .setNameFormat("region-location-%d")
102                 .build()));
103   }
104 
105   /**
106    * Create a cache for region to list of servers
107    * @param time time to cache the locations
108    * @return A new Cache.
109    */
110   private LoadingCache<HRegionInfo, HDFSBlocksDistribution> createCache() {
111     return CacheBuilder.newBuilder()
112         .expireAfterWrite(CACHE_TIME, TimeUnit.MILLISECONDS)
113         .build(loader);
114   }
115 
116   public Configuration getConf() {
117     return conf;
118   }
119 
120   public void setConf(Configuration conf) {
121     this.conf = conf;
122   }
123 
124   public void setServices(MasterServices services) {
125     this.services = services;
126   }
127 
128   public void setClusterStatus(ClusterStatus status) {
129     long currentTime = EnvironmentEdgeManager.currentTime();
130     this.status = status;
131     if (currentTime > lastFullRefresh + (CACHE_TIME / 2)) {
132       // Only count the refresh if it includes user tables ( eg more than meta and namespace ).
133       lastFullRefresh = scheduleFullRefresh()?currentTime:lastFullRefresh;
134     }
135 
136   }
137 
138   /**
139    * Refresh all the region locations.
140    *
141    * @return true if user created regions got refreshed.
142    */
143   private boolean scheduleFullRefresh() {
144     // Protect from anything being null while starting up.
145     if (services == null) {
146       return false;
147     }
148     AssignmentManager am = services.getAssignmentManager();
149 
150     if (am == null) {
151       return false;
152     }
153     RegionStates regionStates = am.getRegionStates();
154     if (regionStates == null) {
155       return false;
156     }
157 
158     Set<HRegionInfo> regions = regionStates.getRegionAssignments().keySet();
159     boolean includesUserTables = false;
160     for (final HRegionInfo hri : regions) {
161       cache.refresh(hri);
162       includesUserTables = includesUserTables || !hri.isSystemTable();
163     }
164     return includesUserTables;
165   }
166 
167   protected List<ServerName> getTopBlockLocations(HRegionInfo region) {
168     HDFSBlocksDistribution blocksDistribution = getBlockDistribution(region);
169     List<String> topHosts = blocksDistribution.getTopHosts();
170     return mapHostNameToServerName(topHosts);
171   }
172 
173   /**
174    * Returns an ordered list of hosts that are hosting the blocks for this
175    * region. The weight of each host is the sum of the block lengths of all
176    * files on that host, so the first host in the list is the server which holds
177    * the most bytes of the given region's HFiles.
178    *
179    * @param region region
180    * @return ordered list of hosts holding blocks of the specified region
181    */
182   protected HDFSBlocksDistribution internalGetTopBlockLocation(HRegionInfo region) {
183     try {
184       HTableDescriptor tableDescriptor = getTableDescriptor(region.getTable());
185       if (tableDescriptor != null) {
186         HDFSBlocksDistribution blocksDistribution =
187             HRegion.computeHDFSBlocksDistribution(getConf(), tableDescriptor, region);
188         return blocksDistribution;
189       }
190     } catch (IOException ioe) {
191       LOG.warn("IOException during HDFSBlocksDistribution computation. for " + "region = "
192           + region.getEncodedName(), ioe);
193     }
194 
195     return new HDFSBlocksDistribution();
196   }
197 
198   /**
199    * return HTableDescriptor for a given tableName
200    *
201    * @param tableName the table name
202    * @return HTableDescriptor
203    * @throws IOException
204    */
205   protected HTableDescriptor getTableDescriptor(TableName tableName) throws IOException {
206     HTableDescriptor tableDescriptor = null;
207     try {
208       if (this.services != null && this.services.getTableDescriptors() != null) {
209         tableDescriptor = this.services.getTableDescriptors().get(tableName);
210       }
211     } catch (FileNotFoundException fnfe) {
212       LOG.debug("FileNotFoundException during getTableDescriptors." + " Current table name = "
213           + tableName, fnfe);
214     }
215 
216     return tableDescriptor;
217   }
218 
219   /**
220    * Map hostname to ServerName, The output ServerName list will have the same
221    * order as input hosts.
222    *
223    * @param hosts the list of hosts
224    * @return ServerName list
225    */
226   protected List<ServerName> mapHostNameToServerName(List<String> hosts) {
227     if (hosts == null || status == null) {
228       if (hosts == null) {
229         LOG.warn("RegionLocationFinder top hosts is null");
230       }
231       return Lists.newArrayList();
232     }
233 
234     List<ServerName> topServerNames = new ArrayList<ServerName>();
235     Collection<ServerName> regionServers = status.getServers();
236 
237     // create a mapping from hostname to ServerName for fast lookup
238     HashMap<String, List<ServerName>> hostToServerName = new HashMap<String, List<ServerName>>();
239     for (ServerName sn : regionServers) {
240       String host = sn.getHostname();
241       if (!hostToServerName.containsKey(host)) {
242         hostToServerName.put(host, new ArrayList<ServerName>());
243       }
244       hostToServerName.get(host).add(sn);
245     }
246 
247     for (String host : hosts) {
248       if (!hostToServerName.containsKey(host)) {
249         continue;
250       }
251       for (ServerName sn : hostToServerName.get(host)) {
252         // it is possible that HDFS is up ( thus host is valid ),
253         // but RS is down ( thus sn is null )
254         if (sn != null) {
255           topServerNames.add(sn);
256         }
257       }
258     }
259     return topServerNames;
260   }
261 
262   public HDFSBlocksDistribution getBlockDistribution(HRegionInfo hri) {
263     HDFSBlocksDistribution blockDistbn = null;
264     try {
265       if (cache.asMap().containsKey(hri)) {
266         blockDistbn = cache.get(hri);
267         return blockDistbn;
268       } else {
269         LOG.debug("HDFSBlocksDistribution not found in cache for region "
270             + hri.getRegionNameAsString());
271         blockDistbn = internalGetTopBlockLocation(hri);
272         cache.put(hri, blockDistbn);
273         return blockDistbn;
274       }
275     } catch (ExecutionException e) {
276       LOG.warn("Error while fetching cache entry ", e);
277       blockDistbn = internalGetTopBlockLocation(hri);
278       cache.put(hri, blockDistbn);
279       return blockDistbn;
280     }
281   }
282 }