001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.concurrent.Callable;
028import java.util.concurrent.ExecutionException;
029import java.util.concurrent.Executors;
030import java.util.concurrent.TimeUnit;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.ClusterMetrics;
033import org.apache.hadoop.hbase.HDFSBlocksDistribution;
034import org.apache.hadoop.hbase.RegionMetrics;
035import org.apache.hadoop.hbase.ServerMetrics;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.client.TableDescriptor;
040import org.apache.hadoop.hbase.master.MasterServices;
041import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
042import org.apache.hadoop.hbase.regionserver.HRegion;
043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
049import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
050import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
051import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
052import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Futures;
053import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListenableFuture;
054import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
055import org.apache.hbase.thirdparty.com.google.common.util.concurrent.MoreExecutors;
056import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
057
058/**
059 * This will find where data for a region is located in HDFS. It ranks {@link ServerName}'s by the
060 * size of the store files they are holding for a given region.
061 */
062@InterfaceAudience.Private
063class RegionLocationFinder {
064  private static final Logger LOG = LoggerFactory.getLogger(RegionLocationFinder.class);
065  private static final long CACHE_TIME = 240 * 60 * 1000;
066  private static final float EPSILON = 0.0001f;
067  private static final HDFSBlocksDistribution EMPTY_BLOCK_DISTRIBUTION =
068    new HDFSBlocksDistribution();
069
070  private Configuration conf;
071  private volatile ClusterMetrics status;
072  private MasterServices services;
073  private final ListeningExecutorService executor;
074  // Do not scheduleFullRefresh at master startup
075  private long lastFullRefresh = EnvironmentEdgeManager.currentTime();
076
077  private CacheLoader<RegionInfo, HDFSBlocksDistribution> loader =
078    new CacheLoader<RegionInfo, HDFSBlocksDistribution>() {
079
080      @Override
081      public ListenableFuture<HDFSBlocksDistribution> reload(final RegionInfo hri,
082        HDFSBlocksDistribution oldValue) throws Exception {
083        return executor.submit(new Callable<HDFSBlocksDistribution>() {
084          @Override
085          public HDFSBlocksDistribution call() throws Exception {
086            return internalGetTopBlockLocation(hri);
087          }
088        });
089      }
090
091      @Override
092      public HDFSBlocksDistribution load(RegionInfo key) throws Exception {
093        return internalGetTopBlockLocation(key);
094      }
095    };
096
097  // The cache for where regions are located.
098  private LoadingCache<RegionInfo, HDFSBlocksDistribution> cache = null;
099
100  RegionLocationFinder() {
101    this.cache = createCache();
102    executor = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(5,
103      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("region-location-%d").build()));
104  }
105
106  /**
107   * Create a cache for region to list of servers
108   * @return A new Cache.
109   */
110  private LoadingCache<RegionInfo, HDFSBlocksDistribution> createCache() {
111    return CacheBuilder.newBuilder().expireAfterWrite(CACHE_TIME, TimeUnit.MILLISECONDS)
112      .build(loader);
113  }
114
115  public Configuration getConf() {
116    return conf;
117  }
118
119  public void setConf(Configuration conf) {
120    this.conf = conf;
121  }
122
123  public void setServices(MasterServices services) {
124    this.services = services;
125  }
126
127  public void setClusterMetrics(ClusterMetrics status) {
128    long currentTime = EnvironmentEdgeManager.currentTime();
129
130    if (currentTime > lastFullRefresh + (CACHE_TIME / 2)) {
131      this.status = status;
132      // Only count the refresh if it includes user tables ( eg more than meta and namespace ).
133      lastFullRefresh = scheduleFullRefresh() ? currentTime : lastFullRefresh;
134    } else {
135      refreshLocalityChangedRegions(this.status, status);
136      this.status = status;
137    }
138  }
139
140  /**
141   * If locality for a region has changed, that pretty certainly means our cache is out of date.
142   * Compare oldStatus and newStatus, refreshing any regions which have moved or changed locality.
143   */
144  private void refreshLocalityChangedRegions(ClusterMetrics oldStatus, ClusterMetrics newStatus) {
145    if (oldStatus == null || newStatus == null) {
146      LOG.debug("Skipping locality-based refresh due to oldStatus={}, newStatus={}", oldStatus,
147        newStatus);
148      return;
149    }
150
151    Map<ServerName, ServerMetrics> oldServers = oldStatus.getLiveServerMetrics();
152    Map<ServerName, ServerMetrics> newServers = newStatus.getLiveServerMetrics();
153
154    Map<String, RegionInfo> regionsByName = new HashMap<>(cache.asMap().size());
155    for (RegionInfo regionInfo : cache.asMap().keySet()) {
156      regionsByName.put(regionInfo.getEncodedName(), regionInfo);
157    }
158
159    for (Map.Entry<ServerName, ServerMetrics> serverEntry : newServers.entrySet()) {
160      Map<byte[], RegionMetrics> newRegions = serverEntry.getValue().getRegionMetrics();
161      for (Map.Entry<byte[], RegionMetrics> regionEntry : newRegions.entrySet()) {
162        String encodedName = RegionInfo.encodeRegionName(regionEntry.getKey());
163        RegionInfo region = regionsByName.get(encodedName);
164        if (region == null) {
165          continue;
166        }
167
168        float newLocality = regionEntry.getValue().getDataLocality();
169        float oldLocality = getOldLocality(serverEntry.getKey(), regionEntry.getKey(), oldServers);
170
171        if (Math.abs(newLocality - oldLocality) > EPSILON) {
172          LOG.debug("Locality for region {} changed from {} to {}, refreshing cache",
173            region.getEncodedName(), oldLocality, newLocality);
174          cache.refresh(region);
175        }
176      }
177
178    }
179  }
180
181  private float getOldLocality(ServerName newServer, byte[] regionName,
182    Map<ServerName, ServerMetrics> oldServers) {
183    ServerMetrics serverMetrics = oldServers.get(newServer);
184    if (serverMetrics == null) {
185      return -1f;
186    }
187    RegionMetrics regionMetrics = serverMetrics.getRegionMetrics().get(regionName);
188    if (regionMetrics == null) {
189      return -1f;
190    }
191
192    return regionMetrics.getDataLocality();
193  }
194
195  /**
196   * Refresh all the region locations.
197   * @return true if user created regions got refreshed.
198   */
199  private boolean scheduleFullRefresh() {
200    // Protect from anything being null while starting up.
201    if (services == null) {
202      return false;
203    }
204
205    final AssignmentManager am = services.getAssignmentManager();
206    if (am == null) {
207      return false;
208    }
209
210    // TODO: Should this refresh all the regions or only the ones assigned?
211    boolean includesUserTables = false;
212    for (final RegionInfo hri : am.getAssignedRegions()) {
213      cache.refresh(hri);
214      includesUserTables = includesUserTables || !hri.getTable().isSystemTable();
215    }
216    return includesUserTables;
217  }
218
219  protected List<ServerName> getTopBlockLocations(RegionInfo region) {
220    List<String> topHosts = getBlockDistribution(region).getTopHosts();
221    return mapHostNameToServerName(topHosts);
222  }
223
224  /**
225   * Returns an ordered list of hosts which have better locality for this region than the current
226   * host.
227   */
228  protected List<ServerName> getTopBlockLocations(RegionInfo region, String currentHost) {
229    HDFSBlocksDistribution blocksDistribution = getBlockDistribution(region);
230    List<String> topHosts = new ArrayList<>();
231    for (String host : blocksDistribution.getTopHosts()) {
232      if (host.equals(currentHost)) {
233        break;
234      }
235      topHosts.add(host);
236    }
237    return mapHostNameToServerName(topHosts);
238  }
239
240  /**
241   * Returns an ordered list of hosts that are hosting the blocks for this region. The weight of
242   * each host is the sum of the block lengths of all files on that host, so the first host in the
243   * list is the server which holds the most bytes of the given region's HFiles.
244   * @param region region
245   * @return ordered list of hosts holding blocks of the specified region
246   */
247  protected HDFSBlocksDistribution internalGetTopBlockLocation(RegionInfo region) {
248    try {
249      TableDescriptor tableDescriptor = getTableDescriptor(region.getTable());
250      if (tableDescriptor != null) {
251        HDFSBlocksDistribution blocksDistribution =
252          HRegion.computeHDFSBlocksDistribution(getConf(), tableDescriptor, region);
253        return blocksDistribution;
254      }
255    } catch (IOException ioe) {
256      LOG.warn("IOException during HDFSBlocksDistribution computation for region = {}",
257        region.getEncodedName(), ioe);
258    }
259
260    return EMPTY_BLOCK_DISTRIBUTION;
261  }
262
263  /**
264   * return TableDescriptor for a given tableName
265   * @param tableName the table name nn
266   */
267  protected TableDescriptor getTableDescriptor(TableName tableName) throws IOException {
268    TableDescriptor tableDescriptor = null;
269    try {
270      if (this.services != null && this.services.getTableDescriptors() != null) {
271        tableDescriptor = this.services.getTableDescriptors().get(tableName);
272      }
273    } catch (FileNotFoundException fnfe) {
274      LOG.debug("tableName={}", tableName, fnfe);
275    }
276
277    return tableDescriptor;
278  }
279
280  /**
281   * Map hostname to ServerName, The output ServerName list will have the same order as input hosts.
282   * @param hosts the list of hosts
283   * @return ServerName list
284   */
285  List<ServerName> mapHostNameToServerName(List<String> hosts) {
286    if (hosts == null || status == null) {
287      if (hosts == null) {
288        LOG.warn("RegionLocationFinder top hosts is null");
289      }
290      return Lists.newArrayList();
291    }
292
293    List<ServerName> topServerNames = new ArrayList<>();
294    Collection<ServerName> regionServers = status.getLiveServerMetrics().keySet();
295
296    // create a mapping from hostname to ServerName for fast lookup
297    HashMap<String, List<ServerName>> hostToServerName = new HashMap<>();
298    for (ServerName sn : regionServers) {
299      String host = sn.getHostname();
300      if (!hostToServerName.containsKey(host)) {
301        hostToServerName.put(host, new ArrayList<>());
302      }
303      hostToServerName.get(host).add(sn);
304    }
305
306    for (String host : hosts) {
307      if (!hostToServerName.containsKey(host)) {
308        continue;
309      }
310      for (ServerName sn : hostToServerName.get(host)) {
311        // it is possible that HDFS is up ( thus host is valid ),
312        // but RS is down ( thus sn is null )
313        if (sn != null) {
314          topServerNames.add(sn);
315        }
316      }
317    }
318    return topServerNames;
319  }
320
321  public HDFSBlocksDistribution getBlockDistribution(RegionInfo hri) {
322    HDFSBlocksDistribution blockDistbn = null;
323    try {
324      if (cache.asMap().containsKey(hri)) {
325        blockDistbn = cache.get(hri);
326        return blockDistbn;
327      } else {
328        LOG.trace("HDFSBlocksDistribution not found in cache for {}", hri.getRegionNameAsString());
329        blockDistbn = internalGetTopBlockLocation(hri);
330        cache.put(hri, blockDistbn);
331        return blockDistbn;
332      }
333    } catch (ExecutionException e) {
334      LOG.warn("Error while fetching cache entry ", e);
335      blockDistbn = internalGetTopBlockLocation(hri);
336      cache.put(hri, blockDistbn);
337      return blockDistbn;
338    }
339  }
340
341  private ListenableFuture<HDFSBlocksDistribution> asyncGetBlockDistribution(RegionInfo hri) {
342    try {
343      return loader.reload(hri, EMPTY_BLOCK_DISTRIBUTION);
344    } catch (Exception e) {
345      return Futures.immediateFuture(EMPTY_BLOCK_DISTRIBUTION);
346    }
347  }
348
349  public void refreshAndWait(Collection<RegionInfo> hris) {
350    ArrayList<ListenableFuture<HDFSBlocksDistribution>> regionLocationFutures =
351      new ArrayList<>(hris.size());
352    for (RegionInfo hregionInfo : hris) {
353      regionLocationFutures.add(asyncGetBlockDistribution(hregionInfo));
354    }
355    int index = 0;
356    for (RegionInfo hregionInfo : hris) {
357      ListenableFuture<HDFSBlocksDistribution> future = regionLocationFutures.get(index);
358      try {
359        cache.put(hregionInfo, future.get());
360      } catch (InterruptedException ite) {
361        Thread.currentThread().interrupt();
362      } catch (ExecutionException ee) {
363        LOG.debug("ExecutionException during HDFSBlocksDistribution computation for region = {}",
364          hregionInfo.getEncodedName(), ee);
365      }
366      index++;
367    }
368  }
369
370  // For test
371  LoadingCache<RegionInfo, HDFSBlocksDistribution> getCache() {
372    return cache;
373  }
374}