1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master.balancer;
19
20 import com.google.common.cache.CacheBuilder;
21 import com.google.common.cache.CacheLoader;
22 import com.google.common.cache.LoadingCache;
23 import com.google.common.collect.Lists;
24 import com.google.common.util.concurrent.ListenableFuture;
25 import com.google.common.util.concurrent.ListeningExecutorService;
26 import com.google.common.util.concurrent.MoreExecutors;
27 import com.google.common.util.concurrent.ThreadFactoryBuilder;
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.ClusterStatus;
32 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
33 import org.apache.hadoop.hbase.HRegionInfo;
34 import org.apache.hadoop.hbase.HTableDescriptor;
35 import org.apache.hadoop.hbase.ServerName;
36 import org.apache.hadoop.hbase.TableName;
37 import org.apache.hadoop.hbase.classification.InterfaceAudience;
38 import org.apache.hadoop.hbase.master.AssignmentManager;
39 import org.apache.hadoop.hbase.master.MasterServices;
40 import org.apache.hadoop.hbase.master.RegionStates;
41 import org.apache.hadoop.hbase.regionserver.HRegion;
42 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
43
44 import java.io.FileNotFoundException;
45 import java.io.IOException;
46 import java.util.ArrayList;
47 import java.util.Collection;
48 import java.util.HashMap;
49 import java.util.List;
50 import java.util.Set;
51 import java.util.concurrent.Callable;
52 import java.util.concurrent.ExecutionException;
53 import java.util.concurrent.Executors;
54 import java.util.concurrent.TimeUnit;
55
56
57
58
59
60
61
62 @InterfaceAudience.Private
63 class RegionLocationFinder {
64 private static final Log LOG = LogFactory.getLog(RegionLocationFinder.class);
65 private static final long CACHE_TIME = 240 * 60 * 1000;
66 private Configuration conf;
67 private volatile ClusterStatus status;
68 private MasterServices services;
69 private final ListeningExecutorService executor;
70 private long lastFullRefresh = 0;
71
72 private CacheLoader<HRegionInfo, HDFSBlocksDistribution> loader =
73 new CacheLoader<HRegionInfo, HDFSBlocksDistribution>() {
74
75 public ListenableFuture<HDFSBlocksDistribution> reload(final HRegionInfo hri,
76 HDFSBlocksDistribution oldValue) throws Exception {
77 return executor.submit(new Callable<HDFSBlocksDistribution>() {
78 @Override
79 public HDFSBlocksDistribution call() throws Exception {
80 return internalGetTopBlockLocation(hri);
81 }
82 });
83 }
84
85 @Override
86 public HDFSBlocksDistribution load(HRegionInfo key) throws Exception {
87 return internalGetTopBlockLocation(key);
88 }
89 };
90
91
92 private LoadingCache<HRegionInfo, HDFSBlocksDistribution> cache = null;
93
94 RegionLocationFinder() {
95 this.cache = createCache();
96 executor = MoreExecutors.listeningDecorator(
97 Executors.newScheduledThreadPool(
98 5,
99 new ThreadFactoryBuilder().
100 setDaemon(true)
101 .setNameFormat("region-location-%d")
102 .build()));
103 }
104
105
106
107
108
109
110 private LoadingCache<HRegionInfo, HDFSBlocksDistribution> createCache() {
111 return CacheBuilder.newBuilder()
112 .expireAfterWrite(CACHE_TIME, TimeUnit.MILLISECONDS)
113 .build(loader);
114 }
115
116 public Configuration getConf() {
117 return conf;
118 }
119
120 public void setConf(Configuration conf) {
121 this.conf = conf;
122 }
123
124 public void setServices(MasterServices services) {
125 this.services = services;
126 }
127
128 public void setClusterStatus(ClusterStatus status) {
129 long currentTime = EnvironmentEdgeManager.currentTime();
130 this.status = status;
131 if (currentTime > lastFullRefresh + (CACHE_TIME / 2)) {
132
133 lastFullRefresh = scheduleFullRefresh()?currentTime:lastFullRefresh;
134 }
135
136 }
137
138
139
140
141
142
143 private boolean scheduleFullRefresh() {
144
145 if (services == null) {
146 return false;
147 }
148 AssignmentManager am = services.getAssignmentManager();
149
150 if (am == null) {
151 return false;
152 }
153 RegionStates regionStates = am.getRegionStates();
154 if (regionStates == null) {
155 return false;
156 }
157
158 Set<HRegionInfo> regions = regionStates.getRegionAssignments().keySet();
159 boolean includesUserTables = false;
160 for (final HRegionInfo hri : regions) {
161 cache.refresh(hri);
162 includesUserTables = includesUserTables || !hri.isSystemTable();
163 }
164 return includesUserTables;
165 }
166
167 protected List<ServerName> getTopBlockLocations(HRegionInfo region) {
168 HDFSBlocksDistribution blocksDistribution = getBlockDistribution(region);
169 List<String> topHosts = blocksDistribution.getTopHosts();
170 return mapHostNameToServerName(topHosts);
171 }
172
173
174
175
176
177
178
179
180
181
182 protected HDFSBlocksDistribution internalGetTopBlockLocation(HRegionInfo region) {
183 try {
184 HTableDescriptor tableDescriptor = getTableDescriptor(region.getTable());
185 if (tableDescriptor != null) {
186 HDFSBlocksDistribution blocksDistribution =
187 HRegion.computeHDFSBlocksDistribution(getConf(), tableDescriptor, region);
188 return blocksDistribution;
189 }
190 } catch (IOException ioe) {
191 LOG.warn("IOException during HDFSBlocksDistribution computation. for " + "region = "
192 + region.getEncodedName(), ioe);
193 }
194
195 return new HDFSBlocksDistribution();
196 }
197
198
199
200
201
202
203
204
205 protected HTableDescriptor getTableDescriptor(TableName tableName) throws IOException {
206 HTableDescriptor tableDescriptor = null;
207 try {
208 if (this.services != null && this.services.getTableDescriptors() != null) {
209 tableDescriptor = this.services.getTableDescriptors().get(tableName);
210 }
211 } catch (FileNotFoundException fnfe) {
212 LOG.debug("FileNotFoundException during getTableDescriptors." + " Current table name = "
213 + tableName, fnfe);
214 }
215
216 return tableDescriptor;
217 }
218
219
220
221
222
223
224
225
226 protected List<ServerName> mapHostNameToServerName(List<String> hosts) {
227 if (hosts == null || status == null) {
228 if (hosts == null) {
229 LOG.warn("RegionLocationFinder top hosts is null");
230 }
231 return Lists.newArrayList();
232 }
233
234 List<ServerName> topServerNames = new ArrayList<ServerName>();
235 Collection<ServerName> regionServers = status.getServers();
236
237
238 HashMap<String, List<ServerName>> hostToServerName = new HashMap<String, List<ServerName>>();
239 for (ServerName sn : regionServers) {
240 String host = sn.getHostname();
241 if (!hostToServerName.containsKey(host)) {
242 hostToServerName.put(host, new ArrayList<ServerName>());
243 }
244 hostToServerName.get(host).add(sn);
245 }
246
247 for (String host : hosts) {
248 if (!hostToServerName.containsKey(host)) {
249 continue;
250 }
251 for (ServerName sn : hostToServerName.get(host)) {
252
253
254 if (sn != null) {
255 topServerNames.add(sn);
256 }
257 }
258 }
259 return topServerNames;
260 }
261
262 public HDFSBlocksDistribution getBlockDistribution(HRegionInfo hri) {
263 HDFSBlocksDistribution blockDistbn = null;
264 try {
265 if (cache.asMap().containsKey(hri)) {
266 blockDistbn = cache.get(hri);
267 return blockDistbn;
268 } else {
269 LOG.debug("HDFSBlocksDistribution not found in cache for region "
270 + hri.getRegionNameAsString());
271 blockDistbn = internalGetTopBlockLocation(hri);
272 cache.put(hri, blockDistbn);
273 return blockDistbn;
274 }
275 } catch (ExecutionException e) {
276 LOG.warn("Error while fetching cache entry ", e);
277 blockDistbn = internalGetTopBlockLocation(hri);
278 cache.put(hri, blockDistbn);
279 return blockDistbn;
280 }
281 }
282 }