001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.HashMap;
025import java.util.List;
026import java.util.concurrent.Callable;
027import java.util.concurrent.ExecutionException;
028import java.util.concurrent.Executors;
029import java.util.concurrent.TimeUnit;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.ClusterMetrics;
032import org.apache.hadoop.hbase.HDFSBlocksDistribution;
033import org.apache.hadoop.hbase.ServerName;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.RegionInfo;
036import org.apache.hadoop.hbase.client.TableDescriptor;
037import org.apache.hadoop.hbase.master.MasterServices;
038import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
039import org.apache.hadoop.hbase.regionserver.HRegion;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
045import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
046import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
047import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
048import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Futures;
049import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListenableFuture;
050import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
051import org.apache.hbase.thirdparty.com.google.common.util.concurrent.MoreExecutors;
052import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
053
054/**
055 * This will find where data for a region is located in HDFS. It ranks
056 * {@link ServerName}'s by the size of the store files they are holding for a
057 * given region.
058 *
059 */
060@InterfaceAudience.Private
061class RegionLocationFinder {
062  private static final Logger LOG = LoggerFactory.getLogger(RegionLocationFinder.class);
063  private static final long CACHE_TIME = 240 * 60 * 1000;
064  private static final HDFSBlocksDistribution EMPTY_BLOCK_DISTRIBUTION = new HDFSBlocksDistribution();
065  private Configuration conf;
066  private volatile ClusterMetrics status;
067  private MasterServices services;
068  private final ListeningExecutorService executor;
069  // Do not scheduleFullRefresh at master startup
070  private long lastFullRefresh = EnvironmentEdgeManager.currentTime();
071
072  private CacheLoader<RegionInfo, HDFSBlocksDistribution> loader =
073      new CacheLoader<RegionInfo, HDFSBlocksDistribution>() {
074
075    @Override
076    public ListenableFuture<HDFSBlocksDistribution> reload(final RegionInfo hri,
077        HDFSBlocksDistribution oldValue) throws Exception {
078      return executor.submit(new Callable<HDFSBlocksDistribution>() {
079        @Override
080        public HDFSBlocksDistribution call() throws Exception {
081          return internalGetTopBlockLocation(hri);
082        }
083      });
084    }
085
086    @Override
087    public HDFSBlocksDistribution load(RegionInfo key) throws Exception {
088      return internalGetTopBlockLocation(key);
089    }
090  };
091
092  // The cache for where regions are located.
093  private LoadingCache<RegionInfo, HDFSBlocksDistribution> cache = null;
094
095  RegionLocationFinder() {
096    this.cache = createCache();
097    executor = MoreExecutors.listeningDecorator(
098        Executors.newScheduledThreadPool(
099            5,
100            new ThreadFactoryBuilder().
101                setDaemon(true)
102                .setNameFormat("region-location-%d")
103                .build()));
104  }
105
106  /**
107   * Create a cache for region to list of servers
108   * @return A new Cache.
109   */
110  private LoadingCache<RegionInfo, HDFSBlocksDistribution> createCache() {
111    return CacheBuilder.newBuilder()
112        .expireAfterWrite(CACHE_TIME, TimeUnit.MILLISECONDS)
113        .build(loader);
114  }
115
116  public Configuration getConf() {
117    return conf;
118  }
119
120  public void setConf(Configuration conf) {
121    this.conf = conf;
122  }
123
124  public void setServices(MasterServices services) {
125    this.services = services;
126  }
127
128  public void setClusterMetrics(ClusterMetrics status) {
129    long currentTime = EnvironmentEdgeManager.currentTime();
130    this.status = status;
131    if (currentTime > lastFullRefresh + (CACHE_TIME / 2)) {
132      // Only count the refresh if it includes user tables ( eg more than meta and namespace ).
133      lastFullRefresh = scheduleFullRefresh()?currentTime:lastFullRefresh;
134    }
135
136  }
137
138  /**
139   * Refresh all the region locations.
140   *
141   * @return true if user created regions got refreshed.
142   */
143  private boolean scheduleFullRefresh() {
144    // Protect from anything being null while starting up.
145    if (services == null) {
146      return false;
147    }
148
149    final AssignmentManager am = services.getAssignmentManager();
150    if (am == null) {
151      return false;
152    }
153
154    // TODO: Should this refresh all the regions or only the ones assigned?
155    boolean includesUserTables = false;
156    for (final RegionInfo hri : am.getAssignedRegions()) {
157      cache.refresh(hri);
158      includesUserTables = includesUserTables || !hri.getTable().isSystemTable();
159    }
160    return includesUserTables;
161  }
162
163  protected List<ServerName> getTopBlockLocations(RegionInfo region) {
164    List<String> topHosts = getBlockDistribution(region).getTopHosts();
165    return mapHostNameToServerName(topHosts);
166  }
167
168  /**
169   * Returns an ordered list of hosts which have better locality for this region
170   * than the current host.
171   */
172  protected List<ServerName> getTopBlockLocations(RegionInfo region, String currentHost) {
173    HDFSBlocksDistribution blocksDistribution = getBlockDistribution(region);
174    List<String> topHosts = new ArrayList<>();
175    for (String host : blocksDistribution.getTopHosts()) {
176      if (host.equals(currentHost)) {
177        break;
178      }
179      topHosts.add(host);
180    }
181    return mapHostNameToServerName(topHosts);
182  }
183
184  /**
185   * Returns an ordered list of hosts that are hosting the blocks for this
186   * region. The weight of each host is the sum of the block lengths of all
187   * files on that host, so the first host in the list is the server which holds
188   * the most bytes of the given region's HFiles.
189   *
190   * @param region region
191   * @return ordered list of hosts holding blocks of the specified region
192   */
193  protected HDFSBlocksDistribution internalGetTopBlockLocation(RegionInfo region) {
194    try {
195      TableDescriptor tableDescriptor = getTableDescriptor(region.getTable());
196      if (tableDescriptor != null) {
197        HDFSBlocksDistribution blocksDistribution =
198            HRegion.computeHDFSBlocksDistribution(getConf(), tableDescriptor, region);
199        return blocksDistribution;
200      }
201    } catch (IOException ioe) {
202      LOG.warn("IOException during HDFSBlocksDistribution computation. for " + "region = "
203          + region.getEncodedName(), ioe);
204    }
205
206    return EMPTY_BLOCK_DISTRIBUTION;
207  }
208
209  /**
210   * return TableDescriptor for a given tableName
211   *
212   * @param tableName the table name
213   * @return TableDescriptor
214   * @throws IOException
215   */
216  protected TableDescriptor getTableDescriptor(TableName tableName) throws IOException {
217    TableDescriptor tableDescriptor = null;
218    try {
219      if (this.services != null && this.services.getTableDescriptors() != null) {
220        tableDescriptor = this.services.getTableDescriptors().get(tableName);
221      }
222    } catch (FileNotFoundException fnfe) {
223      LOG.debug("tableName={}", tableName, fnfe);
224    }
225
226    return tableDescriptor;
227  }
228
229  /**
230   * Map hostname to ServerName, The output ServerName list will have the same
231   * order as input hosts.
232   *
233   * @param hosts the list of hosts
234   * @return ServerName list
235   */
236  protected List<ServerName> mapHostNameToServerName(List<String> hosts) {
237    if (hosts == null || status == null) {
238      if (hosts == null) {
239        LOG.warn("RegionLocationFinder top hosts is null");
240      }
241      return Lists.newArrayList();
242    }
243
244    List<ServerName> topServerNames = new ArrayList<>();
245    Collection<ServerName> regionServers = status.getLiveServerMetrics().keySet();
246
247    // create a mapping from hostname to ServerName for fast lookup
248    HashMap<String, List<ServerName>> hostToServerName = new HashMap<>();
249    for (ServerName sn : regionServers) {
250      String host = sn.getHostname();
251      if (!hostToServerName.containsKey(host)) {
252        hostToServerName.put(host, new ArrayList<>());
253      }
254      hostToServerName.get(host).add(sn);
255    }
256
257    for (String host : hosts) {
258      if (!hostToServerName.containsKey(host)) {
259        continue;
260      }
261      for (ServerName sn : hostToServerName.get(host)) {
262        // it is possible that HDFS is up ( thus host is valid ),
263        // but RS is down ( thus sn is null )
264        if (sn != null) {
265          topServerNames.add(sn);
266        }
267      }
268    }
269    return topServerNames;
270  }
271
272  public HDFSBlocksDistribution getBlockDistribution(RegionInfo hri) {
273    HDFSBlocksDistribution blockDistbn = null;
274    try {
275      if (cache.asMap().containsKey(hri)) {
276        blockDistbn = cache.get(hri);
277        return blockDistbn;
278      } else {
279        LOG.trace("HDFSBlocksDistribution not found in cache for {}", hri.getRegionNameAsString());
280        blockDistbn = internalGetTopBlockLocation(hri);
281        cache.put(hri, blockDistbn);
282        return blockDistbn;
283      }
284    } catch (ExecutionException e) {
285      LOG.warn("Error while fetching cache entry ", e);
286      blockDistbn = internalGetTopBlockLocation(hri);
287      cache.put(hri, blockDistbn);
288      return blockDistbn;
289    }
290  }
291
292  private ListenableFuture<HDFSBlocksDistribution> asyncGetBlockDistribution(
293      RegionInfo hri) {
294    try {
295      return loader.reload(hri, EMPTY_BLOCK_DISTRIBUTION);
296    } catch (Exception e) {
297      return Futures.immediateFuture(EMPTY_BLOCK_DISTRIBUTION);
298    }
299  }
300
301  public void refreshAndWait(Collection<RegionInfo> hris) {
302    ArrayList<ListenableFuture<HDFSBlocksDistribution>> regionLocationFutures =
303        new ArrayList<>(hris.size());
304    for (RegionInfo hregionInfo : hris) {
305      regionLocationFutures.add(asyncGetBlockDistribution(hregionInfo));
306    }
307    int index = 0;
308    LOG.info("Refreshing block distribution cache for {} regions (Can take a while on big cluster)",
309        hris.size());
310    for (RegionInfo hregionInfo : hris) {
311      ListenableFuture<HDFSBlocksDistribution> future = regionLocationFutures
312          .get(index);
313      try {
314        cache.put(hregionInfo, future.get());
315      } catch (InterruptedException ite) {
316        Thread.currentThread().interrupt();
317      } catch (ExecutionException ee) {
318        LOG.debug(
319            "ExecutionException during HDFSBlocksDistribution computation. for region = "
320                + hregionInfo.getEncodedName(), ee);
321      }
322      index++;
323    }
324    LOG.info("Finished refreshing block distribution cache for {} regions", hris.size());
325  }
326
327  // For test
328  LoadingCache<RegionInfo, HDFSBlocksDistribution> getCache() {
329    return cache;
330  }
331}