001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.Callable;
029import java.util.concurrent.ExecutionException;
030import java.util.concurrent.Executors;
031import java.util.concurrent.TimeUnit;
032import org.apache.hadoop.conf.Configured;
033import org.apache.hadoop.hbase.ClusterMetrics;
034import org.apache.hadoop.hbase.HDFSBlocksDistribution;
035import org.apache.hadoop.hbase.RegionMetrics;
036import org.apache.hadoop.hbase.ServerMetrics;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.client.TableDescriptor;
041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
047import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
048import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
049import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Futures;
050import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListenableFuture;
051import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
052import org.apache.hbase.thirdparty.com.google.common.util.concurrent.MoreExecutors;
053import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
054
055/**
056 * This will find where data for a region is located in HDFS. It ranks {@link ServerName}'s by the
057 * size of the store files they are holding for a given region.
058 */
059@InterfaceAudience.Private
060class RegionHDFSBlockLocationFinder extends Configured {
061  private static final Logger LOG = LoggerFactory.getLogger(RegionHDFSBlockLocationFinder.class);
062  private static final long CACHE_TIME = 240 * 60 * 1000;
063  private static final float EPSILON = 0.0001f;
064  private static final HDFSBlocksDistribution EMPTY_BLOCK_DISTRIBUTION =
065    new HDFSBlocksDistribution();
066  private volatile ClusterMetrics status;
067  private volatile ClusterInfoProvider provider;
068  private final ListeningExecutorService executor;
069  // Do not scheduleFullRefresh at master startup
070  private long lastFullRefresh = EnvironmentEdgeManager.currentTime();
071
072  private CacheLoader<RegionInfo, HDFSBlocksDistribution> loader =
073    new CacheLoader<RegionInfo, HDFSBlocksDistribution>() {
074
075      @Override
076      public ListenableFuture<HDFSBlocksDistribution> reload(final RegionInfo hri,
077        HDFSBlocksDistribution oldValue) throws Exception {
078        return executor.submit(new Callable<HDFSBlocksDistribution>() {
079          @Override
080          public HDFSBlocksDistribution call() throws Exception {
081            return internalGetTopBlockLocation(hri);
082          }
083        });
084      }
085
086      @Override
087      public HDFSBlocksDistribution load(RegionInfo key) throws Exception {
088        return internalGetTopBlockLocation(key);
089      }
090    };
091
092  // The cache for where regions are located.
093  private LoadingCache<RegionInfo, HDFSBlocksDistribution> cache = null;
094
095  RegionHDFSBlockLocationFinder() {
096    this.cache = createCache();
097    executor = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(5,
098      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("region-location-%d").build()));
099  }
100
101  /**
102   * Create a cache for region to list of servers
103   * @return A new Cache.
104   */
105  private LoadingCache<RegionInfo, HDFSBlocksDistribution> createCache() {
106    return CacheBuilder.newBuilder().expireAfterWrite(CACHE_TIME, TimeUnit.MILLISECONDS)
107      .build(loader);
108  }
109
110  void setClusterInfoProvider(ClusterInfoProvider provider) {
111    this.provider = provider;
112  }
113
114  void setClusterMetrics(ClusterMetrics status) {
115    long currentTime = EnvironmentEdgeManager.currentTime();
116
117    if (currentTime > lastFullRefresh + (CACHE_TIME / 2)) {
118      this.status = status;
119      // Only count the refresh if it includes user tables ( eg more than meta and namespace ).
120      lastFullRefresh = scheduleFullRefresh() ? currentTime : lastFullRefresh;
121    } else {
122      refreshLocalityChangedRegions(this.status, status);
123      this.status = status;
124    }
125  }
126
127  /**
128   * If locality for a region has changed, that pretty certainly means our cache is out of date.
129   * Compare oldStatus and newStatus, refreshing any regions which have moved or changed locality.
130   */
131  private void refreshLocalityChangedRegions(ClusterMetrics oldStatus, ClusterMetrics newStatus) {
132    if (oldStatus == null || newStatus == null) {
133      LOG.debug("Skipping locality-based refresh due to oldStatus={}, newStatus={}", oldStatus,
134        newStatus);
135      return;
136    }
137
138    Map<ServerName, ServerMetrics> oldServers = oldStatus.getLiveServerMetrics();
139    Map<ServerName, ServerMetrics> newServers = newStatus.getLiveServerMetrics();
140
141    Map<String, RegionInfo> regionsByName = new HashMap<>(cache.asMap().size());
142    for (RegionInfo regionInfo : cache.asMap().keySet()) {
143      regionsByName.put(regionInfo.getEncodedName(), regionInfo);
144    }
145
146    for (Map.Entry<ServerName, ServerMetrics> serverEntry : newServers.entrySet()) {
147      Map<byte[], RegionMetrics> newRegions = serverEntry.getValue().getRegionMetrics();
148      for (Map.Entry<byte[], RegionMetrics> regionEntry : newRegions.entrySet()) {
149        String encodedName = RegionInfo.encodeRegionName(regionEntry.getKey());
150        RegionInfo region = regionsByName.get(encodedName);
151        if (region == null) {
152          continue;
153        }
154
155        float newLocality = regionEntry.getValue().getDataLocality();
156        float oldLocality = getOldLocality(serverEntry.getKey(), regionEntry.getKey(), oldServers);
157
158        if (Math.abs(newLocality - oldLocality) > EPSILON) {
159          LOG.debug("Locality for region {} changed from {} to {}, refreshing cache",
160            region.getEncodedName(), oldLocality, newLocality);
161          cache.refresh(region);
162        }
163      }
164
165    }
166  }
167
168  private float getOldLocality(ServerName newServer, byte[] regionName,
169    Map<ServerName, ServerMetrics> oldServers) {
170    ServerMetrics serverMetrics = oldServers.get(newServer);
171    if (serverMetrics == null) {
172      return -1f;
173    }
174    RegionMetrics regionMetrics = serverMetrics.getRegionMetrics().get(regionName);
175    if (regionMetrics == null) {
176      return -1f;
177    }
178
179    return regionMetrics.getDataLocality();
180  }
181
182  /**
183   * Refresh all the region locations.
184   * @return true if user created regions got refreshed.
185   */
186  private boolean scheduleFullRefresh() {
187    ClusterInfoProvider service = this.provider;
188    // Protect from anything being null while starting up.
189    if (service == null) {
190      return false;
191    }
192
193    // TODO: Should this refresh all the regions or only the ones assigned?
194    boolean includesUserTables = false;
195    for (final RegionInfo hri : service.getAssignedRegions()) {
196      cache.refresh(hri);
197      includesUserTables |= !hri.getTable().isSystemTable();
198    }
199    return includesUserTables;
200  }
201
202  List<ServerName> getTopBlockLocations(RegionInfo region) {
203    List<String> topHosts = getBlockDistribution(region).getTopHosts();
204    return mapHostNameToServerName(topHosts);
205  }
206
207  /**
208   * Returns an ordered list of hosts that are hosting the blocks for this region. The weight of
209   * each host is the sum of the block lengths of all files on that host, so the first host in the
210   * list is the server which holds the most bytes of the given region's HFiles.
211   * @param region region
212   * @return ordered list of hosts holding blocks of the specified region
213   */
214  private HDFSBlocksDistribution internalGetTopBlockLocation(RegionInfo region) {
215    try {
216      TableDescriptor tableDescriptor = getDescriptor(region.getTable());
217      if (tableDescriptor != null) {
218        HDFSBlocksDistribution blocksDistribution =
219          provider.computeHDFSBlocksDistribution(getConf(), tableDescriptor, region);
220        return blocksDistribution;
221      }
222    } catch (IOException ioe) {
223      LOG.warn("IOException during HDFSBlocksDistribution computation for region = {}",
224        region.getEncodedName(), ioe);
225    }
226
227    return EMPTY_BLOCK_DISTRIBUTION;
228  }
229
230  /**
231   * return TableDescriptor for a given tableName
232   * @param tableName the table name
233   */
234  private TableDescriptor getDescriptor(TableName tableName) throws IOException {
235    ClusterInfoProvider service = this.provider;
236    if (service == null) {
237      return null;
238    }
239    return service.getTableDescriptor(tableName);
240  }
241
242  /**
243   * Map hostname to ServerName, The output ServerName list will have the same order as input hosts.
244   * @param hosts the list of hosts
245   * @return ServerName list
246   */
247  @RestrictedApi(explanation = "Should only be called in tests", link = "",
248      allowedOnPath = ".*/src/test/.*|.*/RegionHDFSBlockLocationFinder.java")
249  @SuppressWarnings("MixedMutabilityReturnType")
250  List<ServerName> mapHostNameToServerName(List<String> hosts) {
251    if (hosts == null || status == null) {
252      if (hosts == null) {
253        LOG.warn("RegionLocationFinder top hosts is null");
254      }
255      return Collections.emptyList();
256    }
257
258    List<ServerName> topServerNames = new ArrayList<>();
259    Collection<ServerName> regionServers = status.getLiveServerMetrics().keySet();
260
261    // create a mapping from hostname to ServerName for fast lookup
262    Map<String, List<ServerName>> hostToServerName = new HashMap<>();
263    for (ServerName sn : regionServers) {
264      String host = sn.getHostname();
265      if (!hostToServerName.containsKey(host)) {
266        hostToServerName.put(host, new ArrayList<>());
267      }
268      hostToServerName.get(host).add(sn);
269    }
270
271    for (String host : hosts) {
272      if (!hostToServerName.containsKey(host)) {
273        continue;
274      }
275      for (ServerName sn : hostToServerName.get(host)) {
276        // it is possible that HDFS is up ( thus host is valid ),
277        // but RS is down ( thus sn is null )
278        if (sn != null) {
279          topServerNames.add(sn);
280        }
281      }
282    }
283    return topServerNames;
284  }
285
286  HDFSBlocksDistribution getBlockDistribution(RegionInfo hri) {
287    HDFSBlocksDistribution blockDistbn = null;
288    try {
289      if (cache.asMap().containsKey(hri)) {
290        blockDistbn = cache.get(hri);
291        return blockDistbn;
292      } else {
293        LOG.trace("HDFSBlocksDistribution not found in cache for {}", hri.getRegionNameAsString());
294        blockDistbn = internalGetTopBlockLocation(hri);
295        cache.put(hri, blockDistbn);
296        return blockDistbn;
297      }
298    } catch (ExecutionException e) {
299      LOG.warn("Error while fetching cache entry ", e);
300      blockDistbn = internalGetTopBlockLocation(hri);
301      cache.put(hri, blockDistbn);
302      return blockDistbn;
303    }
304  }
305
306  private ListenableFuture<HDFSBlocksDistribution> asyncGetBlockDistribution(RegionInfo hri) {
307    try {
308      return loader.reload(hri, EMPTY_BLOCK_DISTRIBUTION);
309    } catch (Exception e) {
310      return Futures.immediateFuture(EMPTY_BLOCK_DISTRIBUTION);
311    }
312  }
313
314  void refreshAndWait(Collection<RegionInfo> hris) {
315    ArrayList<ListenableFuture<HDFSBlocksDistribution>> regionLocationFutures =
316      new ArrayList<>(hris.size());
317    for (RegionInfo hregionInfo : hris) {
318      regionLocationFutures.add(asyncGetBlockDistribution(hregionInfo));
319    }
320    int index = 0;
321    for (RegionInfo hregionInfo : hris) {
322      ListenableFuture<HDFSBlocksDistribution> future = regionLocationFutures.get(index);
323      try {
324        cache.put(hregionInfo, future.get());
325      } catch (InterruptedException ite) {
326        Thread.currentThread().interrupt();
327      } catch (ExecutionException ee) {
328        LOG.debug("ExecutionException during HDFSBlocksDistribution computation for region = {}",
329          hregionInfo.getEncodedName(), ee);
330      }
331      index++;
332    }
333  }
334
335  @RestrictedApi(explanation = "Should only be called in tests", link = "",
336      allowedOnPath = ".*/src/test/.*")
337  LoadingCache<RegionInfo, HDFSBlocksDistribution> getCache() {
338    return cache;
339  }
340}