001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.Callable; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.Executors; 031import java.util.concurrent.TimeUnit; 032import org.apache.hadoop.conf.Configured; 033import org.apache.hadoop.hbase.ClusterMetrics; 034import org.apache.hadoop.hbase.HDFSBlocksDistribution; 035import org.apache.hadoop.hbase.RegionMetrics; 036import org.apache.hadoop.hbase.ServerMetrics; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.client.TableDescriptor; 041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 047import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; 048import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; 049import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Futures; 050import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListenableFuture; 051import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; 052import org.apache.hbase.thirdparty.com.google.common.util.concurrent.MoreExecutors; 053import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 054 055/** 056 * This will find where data for a region is located in HDFS. It ranks {@link ServerName}'s by the 057 * size of the store files they are holding for a given region. 058 */ 059@InterfaceAudience.Private 060class RegionHDFSBlockLocationFinder extends Configured { 061 private static final Logger LOG = LoggerFactory.getLogger(RegionHDFSBlockLocationFinder.class); 062 private static final long CACHE_TIME = 240 * 60 * 1000; 063 private static final float EPSILON = 0.0001f; 064 private static final HDFSBlocksDistribution EMPTY_BLOCK_DISTRIBUTION = 065 new HDFSBlocksDistribution(); 066 private volatile ClusterMetrics status; 067 private volatile ClusterInfoProvider provider; 068 private final ListeningExecutorService executor; 069 // Do not scheduleFullRefresh at master startup 070 private long lastFullRefresh = EnvironmentEdgeManager.currentTime(); 071 072 private CacheLoader<RegionInfo, HDFSBlocksDistribution> loader = 073 new CacheLoader<RegionInfo, HDFSBlocksDistribution>() { 074 075 @Override 076 public ListenableFuture<HDFSBlocksDistribution> reload(final RegionInfo hri, 077 HDFSBlocksDistribution oldValue) throws Exception { 078 return executor.submit(new Callable<HDFSBlocksDistribution>() { 079 @Override 080 public HDFSBlocksDistribution call() throws Exception { 081 return internalGetTopBlockLocation(hri); 082 } 083 }); 084 } 085 086 @Override 087 public HDFSBlocksDistribution load(RegionInfo key) throws Exception { 088 return internalGetTopBlockLocation(key); 089 } 090 }; 091 092 // The cache for where regions are located. 093 private LoadingCache<RegionInfo, HDFSBlocksDistribution> cache = null; 094 095 RegionHDFSBlockLocationFinder() { 096 this.cache = createCache(); 097 executor = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(5, 098 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("region-location-%d").build())); 099 } 100 101 /** 102 * Create a cache for region to list of servers 103 * @return A new Cache. 104 */ 105 private LoadingCache<RegionInfo, HDFSBlocksDistribution> createCache() { 106 return CacheBuilder.newBuilder().expireAfterWrite(CACHE_TIME, TimeUnit.MILLISECONDS) 107 .build(loader); 108 } 109 110 void setClusterInfoProvider(ClusterInfoProvider provider) { 111 this.provider = provider; 112 } 113 114 void setClusterMetrics(ClusterMetrics status) { 115 long currentTime = EnvironmentEdgeManager.currentTime(); 116 117 if (currentTime > lastFullRefresh + (CACHE_TIME / 2)) { 118 this.status = status; 119 // Only count the refresh if it includes user tables ( eg more than meta and namespace ). 120 lastFullRefresh = scheduleFullRefresh() ? currentTime : lastFullRefresh; 121 } else { 122 refreshLocalityChangedRegions(this.status, status); 123 this.status = status; 124 } 125 } 126 127 /** 128 * If locality for a region has changed, that pretty certainly means our cache is out of date. 129 * Compare oldStatus and newStatus, refreshing any regions which have moved or changed locality. 130 */ 131 private void refreshLocalityChangedRegions(ClusterMetrics oldStatus, ClusterMetrics newStatus) { 132 if (oldStatus == null || newStatus == null) { 133 LOG.debug("Skipping locality-based refresh due to oldStatus={}, newStatus={}", oldStatus, 134 newStatus); 135 return; 136 } 137 138 Map<ServerName, ServerMetrics> oldServers = oldStatus.getLiveServerMetrics(); 139 Map<ServerName, ServerMetrics> newServers = newStatus.getLiveServerMetrics(); 140 141 Map<String, RegionInfo> regionsByName = new HashMap<>(cache.asMap().size()); 142 for (RegionInfo regionInfo : cache.asMap().keySet()) { 143 regionsByName.put(regionInfo.getEncodedName(), regionInfo); 144 } 145 146 for (Map.Entry<ServerName, ServerMetrics> serverEntry : newServers.entrySet()) { 147 Map<byte[], RegionMetrics> newRegions = serverEntry.getValue().getRegionMetrics(); 148 for (Map.Entry<byte[], RegionMetrics> regionEntry : newRegions.entrySet()) { 149 String encodedName = RegionInfo.encodeRegionName(regionEntry.getKey()); 150 RegionInfo region = regionsByName.get(encodedName); 151 if (region == null) { 152 continue; 153 } 154 155 float newLocality = regionEntry.getValue().getDataLocality(); 156 float oldLocality = getOldLocality(serverEntry.getKey(), regionEntry.getKey(), oldServers); 157 158 if (Math.abs(newLocality - oldLocality) > EPSILON) { 159 LOG.debug("Locality for region {} changed from {} to {}, refreshing cache", 160 region.getEncodedName(), oldLocality, newLocality); 161 cache.refresh(region); 162 } 163 } 164 165 } 166 } 167 168 private float getOldLocality(ServerName newServer, byte[] regionName, 169 Map<ServerName, ServerMetrics> oldServers) { 170 ServerMetrics serverMetrics = oldServers.get(newServer); 171 if (serverMetrics == null) { 172 return -1f; 173 } 174 RegionMetrics regionMetrics = serverMetrics.getRegionMetrics().get(regionName); 175 if (regionMetrics == null) { 176 return -1f; 177 } 178 179 return regionMetrics.getDataLocality(); 180 } 181 182 /** 183 * Refresh all the region locations. 184 * @return true if user created regions got refreshed. 185 */ 186 private boolean scheduleFullRefresh() { 187 ClusterInfoProvider service = this.provider; 188 // Protect from anything being null while starting up. 189 if (service == null) { 190 return false; 191 } 192 193 // TODO: Should this refresh all the regions or only the ones assigned? 194 boolean includesUserTables = false; 195 for (final RegionInfo hri : service.getAssignedRegions()) { 196 cache.refresh(hri); 197 includesUserTables |= !hri.getTable().isSystemTable(); 198 } 199 return includesUserTables; 200 } 201 202 List<ServerName> getTopBlockLocations(RegionInfo region) { 203 List<String> topHosts = getBlockDistribution(region).getTopHosts(); 204 return mapHostNameToServerName(topHosts); 205 } 206 207 /** 208 * Returns an ordered list of hosts that are hosting the blocks for this region. The weight of 209 * each host is the sum of the block lengths of all files on that host, so the first host in the 210 * list is the server which holds the most bytes of the given region's HFiles. 211 * @param region region 212 * @return ordered list of hosts holding blocks of the specified region 213 */ 214 private HDFSBlocksDistribution internalGetTopBlockLocation(RegionInfo region) { 215 try { 216 TableDescriptor tableDescriptor = getDescriptor(region.getTable()); 217 if (tableDescriptor != null) { 218 HDFSBlocksDistribution blocksDistribution = 219 provider.computeHDFSBlocksDistribution(getConf(), tableDescriptor, region); 220 return blocksDistribution; 221 } 222 } catch (IOException ioe) { 223 LOG.warn("IOException during HDFSBlocksDistribution computation for region = {}", 224 region.getEncodedName(), ioe); 225 } 226 227 return EMPTY_BLOCK_DISTRIBUTION; 228 } 229 230 /** 231 * return TableDescriptor for a given tableName 232 * @param tableName the table name 233 */ 234 private TableDescriptor getDescriptor(TableName tableName) throws IOException { 235 ClusterInfoProvider service = this.provider; 236 if (service == null) { 237 return null; 238 } 239 return service.getTableDescriptor(tableName); 240 } 241 242 /** 243 * Map hostname to ServerName, The output ServerName list will have the same order as input hosts. 244 * @param hosts the list of hosts 245 * @return ServerName list 246 */ 247 @RestrictedApi(explanation = "Should only be called in tests", link = "", 248 allowedOnPath = ".*/src/test/.*|.*/RegionHDFSBlockLocationFinder.java") 249 @SuppressWarnings("MixedMutabilityReturnType") 250 List<ServerName> mapHostNameToServerName(List<String> hosts) { 251 if (hosts == null || status == null) { 252 if (hosts == null) { 253 LOG.warn("RegionLocationFinder top hosts is null"); 254 } 255 return Collections.emptyList(); 256 } 257 258 List<ServerName> topServerNames = new ArrayList<>(); 259 Collection<ServerName> regionServers = status.getLiveServerMetrics().keySet(); 260 261 // create a mapping from hostname to ServerName for fast lookup 262 Map<String, List<ServerName>> hostToServerName = new HashMap<>(); 263 for (ServerName sn : regionServers) { 264 String host = sn.getHostname(); 265 if (!hostToServerName.containsKey(host)) { 266 hostToServerName.put(host, new ArrayList<>()); 267 } 268 hostToServerName.get(host).add(sn); 269 } 270 271 for (String host : hosts) { 272 if (!hostToServerName.containsKey(host)) { 273 continue; 274 } 275 for (ServerName sn : hostToServerName.get(host)) { 276 // it is possible that HDFS is up ( thus host is valid ), 277 // but RS is down ( thus sn is null ) 278 if (sn != null) { 279 topServerNames.add(sn); 280 } 281 } 282 } 283 return topServerNames; 284 } 285 286 HDFSBlocksDistribution getBlockDistribution(RegionInfo hri) { 287 HDFSBlocksDistribution blockDistbn = null; 288 try { 289 if (cache.asMap().containsKey(hri)) { 290 blockDistbn = cache.get(hri); 291 return blockDistbn; 292 } else { 293 LOG.trace("HDFSBlocksDistribution not found in cache for {}", hri.getRegionNameAsString()); 294 blockDistbn = internalGetTopBlockLocation(hri); 295 cache.put(hri, blockDistbn); 296 return blockDistbn; 297 } 298 } catch (ExecutionException e) { 299 LOG.warn("Error while fetching cache entry ", e); 300 blockDistbn = internalGetTopBlockLocation(hri); 301 cache.put(hri, blockDistbn); 302 return blockDistbn; 303 } 304 } 305 306 private ListenableFuture<HDFSBlocksDistribution> asyncGetBlockDistribution(RegionInfo hri) { 307 try { 308 return loader.reload(hri, EMPTY_BLOCK_DISTRIBUTION); 309 } catch (Exception e) { 310 return Futures.immediateFuture(EMPTY_BLOCK_DISTRIBUTION); 311 } 312 } 313 314 void refreshAndWait(Collection<RegionInfo> hris) { 315 ArrayList<ListenableFuture<HDFSBlocksDistribution>> regionLocationFutures = 316 new ArrayList<>(hris.size()); 317 for (RegionInfo hregionInfo : hris) { 318 regionLocationFutures.add(asyncGetBlockDistribution(hregionInfo)); 319 } 320 int index = 0; 321 for (RegionInfo hregionInfo : hris) { 322 ListenableFuture<HDFSBlocksDistribution> future = regionLocationFutures.get(index); 323 try { 324 cache.put(hregionInfo, future.get()); 325 } catch (InterruptedException ite) { 326 Thread.currentThread().interrupt(); 327 } catch (ExecutionException ee) { 328 LOG.debug("ExecutionException during HDFSBlocksDistribution computation for region = {}", 329 hregionInfo.getEncodedName(), ee); 330 } 331 index++; 332 } 333 } 334 335 @RestrictedApi(explanation = "Should only be called in tests", link = "", 336 allowedOnPath = ".*/src/test/.*") 337 LoadingCache<RegionInfo, HDFSBlocksDistribution> getCache() { 338 return cache; 339 } 340}