001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.HashMap; 025import java.util.List; 026import java.util.concurrent.Callable; 027import java.util.concurrent.ExecutionException; 028import java.util.concurrent.Executors; 029import java.util.concurrent.TimeUnit; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.ClusterMetrics; 032import org.apache.hadoop.hbase.HDFSBlocksDistribution; 033import org.apache.hadoop.hbase.ServerName; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.RegionInfo; 036import org.apache.hadoop.hbase.client.TableDescriptor; 037import org.apache.hadoop.hbase.master.MasterServices; 038import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 039import org.apache.hadoop.hbase.regionserver.HRegion; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 045import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; 046import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; 047import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 048import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Futures; 049import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListenableFuture; 050import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; 051import org.apache.hbase.thirdparty.com.google.common.util.concurrent.MoreExecutors; 052import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 053 054/** 055 * This will find where data for a region is located in HDFS. It ranks 056 * {@link ServerName}'s by the size of the store files they are holding for a 057 * given region. 058 * 059 */ 060@InterfaceAudience.Private 061class RegionLocationFinder { 062 private static final Logger LOG = LoggerFactory.getLogger(RegionLocationFinder.class); 063 private static final long CACHE_TIME = 240 * 60 * 1000; 064 private static final HDFSBlocksDistribution EMPTY_BLOCK_DISTRIBUTION = new HDFSBlocksDistribution(); 065 private Configuration conf; 066 private volatile ClusterMetrics status; 067 private MasterServices services; 068 private final ListeningExecutorService executor; 069 // Do not scheduleFullRefresh at master startup 070 private long lastFullRefresh = EnvironmentEdgeManager.currentTime(); 071 072 private CacheLoader<RegionInfo, HDFSBlocksDistribution> loader = 073 new CacheLoader<RegionInfo, HDFSBlocksDistribution>() { 074 075 @Override 076 public ListenableFuture<HDFSBlocksDistribution> reload(final RegionInfo hri, 077 HDFSBlocksDistribution oldValue) throws Exception { 078 return executor.submit(new Callable<HDFSBlocksDistribution>() { 079 @Override 080 public HDFSBlocksDistribution call() throws Exception { 081 return internalGetTopBlockLocation(hri); 082 } 083 }); 084 } 085 086 @Override 087 public HDFSBlocksDistribution load(RegionInfo key) throws Exception { 088 return internalGetTopBlockLocation(key); 089 } 090 }; 091 092 // The cache for where regions are located. 093 private LoadingCache<RegionInfo, HDFSBlocksDistribution> cache = null; 094 095 RegionLocationFinder() { 096 this.cache = createCache(); 097 executor = MoreExecutors.listeningDecorator( 098 Executors.newScheduledThreadPool( 099 5, 100 new ThreadFactoryBuilder(). 101 setDaemon(true) 102 .setNameFormat("region-location-%d") 103 .build())); 104 } 105 106 /** 107 * Create a cache for region to list of servers 108 * @return A new Cache. 109 */ 110 private LoadingCache<RegionInfo, HDFSBlocksDistribution> createCache() { 111 return CacheBuilder.newBuilder() 112 .expireAfterWrite(CACHE_TIME, TimeUnit.MILLISECONDS) 113 .build(loader); 114 } 115 116 public Configuration getConf() { 117 return conf; 118 } 119 120 public void setConf(Configuration conf) { 121 this.conf = conf; 122 } 123 124 public void setServices(MasterServices services) { 125 this.services = services; 126 } 127 128 public void setClusterMetrics(ClusterMetrics status) { 129 long currentTime = EnvironmentEdgeManager.currentTime(); 130 this.status = status; 131 if (currentTime > lastFullRefresh + (CACHE_TIME / 2)) { 132 // Only count the refresh if it includes user tables ( eg more than meta and namespace ). 133 lastFullRefresh = scheduleFullRefresh()?currentTime:lastFullRefresh; 134 } 135 136 } 137 138 /** 139 * Refresh all the region locations. 140 * 141 * @return true if user created regions got refreshed. 142 */ 143 private boolean scheduleFullRefresh() { 144 // Protect from anything being null while starting up. 145 if (services == null) { 146 return false; 147 } 148 149 final AssignmentManager am = services.getAssignmentManager(); 150 if (am == null) { 151 return false; 152 } 153 154 // TODO: Should this refresh all the regions or only the ones assigned? 155 boolean includesUserTables = false; 156 for (final RegionInfo hri : am.getAssignedRegions()) { 157 cache.refresh(hri); 158 includesUserTables = includesUserTables || !hri.getTable().isSystemTable(); 159 } 160 return includesUserTables; 161 } 162 163 protected List<ServerName> getTopBlockLocations(RegionInfo region) { 164 List<String> topHosts = getBlockDistribution(region).getTopHosts(); 165 return mapHostNameToServerName(topHosts); 166 } 167 168 /** 169 * Returns an ordered list of hosts which have better locality for this region 170 * than the current host. 171 */ 172 protected List<ServerName> getTopBlockLocations(RegionInfo region, String currentHost) { 173 HDFSBlocksDistribution blocksDistribution = getBlockDistribution(region); 174 List<String> topHosts = new ArrayList<>(); 175 for (String host : blocksDistribution.getTopHosts()) { 176 if (host.equals(currentHost)) { 177 break; 178 } 179 topHosts.add(host); 180 } 181 return mapHostNameToServerName(topHosts); 182 } 183 184 /** 185 * Returns an ordered list of hosts that are hosting the blocks for this 186 * region. The weight of each host is the sum of the block lengths of all 187 * files on that host, so the first host in the list is the server which holds 188 * the most bytes of the given region's HFiles. 189 * 190 * @param region region 191 * @return ordered list of hosts holding blocks of the specified region 192 */ 193 protected HDFSBlocksDistribution internalGetTopBlockLocation(RegionInfo region) { 194 try { 195 TableDescriptor tableDescriptor = getTableDescriptor(region.getTable()); 196 if (tableDescriptor != null) { 197 HDFSBlocksDistribution blocksDistribution = 198 HRegion.computeHDFSBlocksDistribution(getConf(), tableDescriptor, region); 199 return blocksDistribution; 200 } 201 } catch (IOException ioe) { 202 LOG.warn("IOException during HDFSBlocksDistribution computation. for " + "region = " 203 + region.getEncodedName(), ioe); 204 } 205 206 return EMPTY_BLOCK_DISTRIBUTION; 207 } 208 209 /** 210 * return TableDescriptor for a given tableName 211 * 212 * @param tableName the table name 213 * @return TableDescriptor 214 * @throws IOException 215 */ 216 protected TableDescriptor getTableDescriptor(TableName tableName) throws IOException { 217 TableDescriptor tableDescriptor = null; 218 try { 219 if (this.services != null && this.services.getTableDescriptors() != null) { 220 tableDescriptor = this.services.getTableDescriptors().get(tableName); 221 } 222 } catch (FileNotFoundException fnfe) { 223 LOG.debug("tableName={}", tableName, fnfe); 224 } 225 226 return tableDescriptor; 227 } 228 229 /** 230 * Map hostname to ServerName, The output ServerName list will have the same 231 * order as input hosts. 232 * 233 * @param hosts the list of hosts 234 * @return ServerName list 235 */ 236 protected List<ServerName> mapHostNameToServerName(List<String> hosts) { 237 if (hosts == null || status == null) { 238 if (hosts == null) { 239 LOG.warn("RegionLocationFinder top hosts is null"); 240 } 241 return Lists.newArrayList(); 242 } 243 244 List<ServerName> topServerNames = new ArrayList<>(); 245 Collection<ServerName> regionServers = status.getLiveServerMetrics().keySet(); 246 247 // create a mapping from hostname to ServerName for fast lookup 248 HashMap<String, List<ServerName>> hostToServerName = new HashMap<>(); 249 for (ServerName sn : regionServers) { 250 String host = sn.getHostname(); 251 if (!hostToServerName.containsKey(host)) { 252 hostToServerName.put(host, new ArrayList<>()); 253 } 254 hostToServerName.get(host).add(sn); 255 } 256 257 for (String host : hosts) { 258 if (!hostToServerName.containsKey(host)) { 259 continue; 260 } 261 for (ServerName sn : hostToServerName.get(host)) { 262 // it is possible that HDFS is up ( thus host is valid ), 263 // but RS is down ( thus sn is null ) 264 if (sn != null) { 265 topServerNames.add(sn); 266 } 267 } 268 } 269 return topServerNames; 270 } 271 272 public HDFSBlocksDistribution getBlockDistribution(RegionInfo hri) { 273 HDFSBlocksDistribution blockDistbn = null; 274 try { 275 if (cache.asMap().containsKey(hri)) { 276 blockDistbn = cache.get(hri); 277 return blockDistbn; 278 } else { 279 LOG.trace("HDFSBlocksDistribution not found in cache for {}", hri.getRegionNameAsString()); 280 blockDistbn = internalGetTopBlockLocation(hri); 281 cache.put(hri, blockDistbn); 282 return blockDistbn; 283 } 284 } catch (ExecutionException e) { 285 LOG.warn("Error while fetching cache entry ", e); 286 blockDistbn = internalGetTopBlockLocation(hri); 287 cache.put(hri, blockDistbn); 288 return blockDistbn; 289 } 290 } 291 292 private ListenableFuture<HDFSBlocksDistribution> asyncGetBlockDistribution( 293 RegionInfo hri) { 294 try { 295 return loader.reload(hri, EMPTY_BLOCK_DISTRIBUTION); 296 } catch (Exception e) { 297 return Futures.immediateFuture(EMPTY_BLOCK_DISTRIBUTION); 298 } 299 } 300 301 public void refreshAndWait(Collection<RegionInfo> hris) { 302 ArrayList<ListenableFuture<HDFSBlocksDistribution>> regionLocationFutures = 303 new ArrayList<>(hris.size()); 304 for (RegionInfo hregionInfo : hris) { 305 regionLocationFutures.add(asyncGetBlockDistribution(hregionInfo)); 306 } 307 int index = 0; 308 LOG.info("Refreshing block distribution cache for {} regions (Can take a while on big cluster)", 309 hris.size()); 310 for (RegionInfo hregionInfo : hris) { 311 ListenableFuture<HDFSBlocksDistribution> future = regionLocationFutures 312 .get(index); 313 try { 314 cache.put(hregionInfo, future.get()); 315 } catch (InterruptedException ite) { 316 Thread.currentThread().interrupt(); 317 } catch (ExecutionException ee) { 318 LOG.debug( 319 "ExecutionException during HDFSBlocksDistribution computation. for region = " 320 + hregionInfo.getEncodedName(), ee); 321 } 322 index++; 323 } 324 LOG.info("Finished refreshing block distribution cache for {} regions", hris.size()); 325 } 326 327 // For test 328 LoadingCache<RegionInfo, HDFSBlocksDistribution> getCache() { 329 return cache; 330 } 331}