001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.util.HashMap;
022import java.util.Map;
023import java.util.concurrent.atomic.AtomicInteger;
024import org.apache.hadoop.fs.BlockLocation;
025import org.apache.hadoop.fs.FileStatus;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032/**
033 * Thread that walks over the filesystem, and computes the mappings Region -> BestHost and Region ->
034 * {@code Map<HostName, fractional-locality-of-region>}
035 */
036@InterfaceAudience.Private
037class FSRegionScanner implements Runnable {
038  static private final Logger LOG = LoggerFactory.getLogger(FSRegionScanner.class);
039
040  private Path regionPath;
041
042  /**
043   * The file system used
044   */
045  private FileSystem fs;
046
047  /**
048   * Maps each region to the RS with highest locality for that region.
049   */
050  private final Map<String, String> regionToBestLocalityRSMapping;
051
052  /**
053   * Maps region encoded names to maps of hostnames to fractional locality of that region on that
054   * host.
055   */
056  private Map<String, Map<String, Float>> regionDegreeLocalityMapping;
057
058  FSRegionScanner(FileSystem fs, Path regionPath, Map<String, String> regionToBestLocalityRSMapping,
059    Map<String, Map<String, Float>> regionDegreeLocalityMapping) {
060    this.fs = fs;
061    this.regionPath = regionPath;
062    this.regionToBestLocalityRSMapping = regionToBestLocalityRSMapping;
063    this.regionDegreeLocalityMapping = regionDegreeLocalityMapping;
064  }
065
066  @Override
067  public void run() {
068    try {
069      // empty the map for each region
070      Map<String, AtomicInteger> blockCountMap = new HashMap<>();
071
072      // get table name
073      String tableName = regionPath.getParent().getName();
074      int totalBlkCount = 0;
075
076      // ignore null
077      FileStatus[] cfList = fs.listStatus(regionPath, new FSUtils.FamilyDirFilter(fs));
078      if (null == cfList) {
079        return;
080      }
081
082      // for each cf, get all the blocks information
083      for (FileStatus cfStatus : cfList) {
084        if (!cfStatus.isDirectory()) {
085          // skip because this is not a CF directory
086          continue;
087        }
088
089        FileStatus[] storeFileLists = fs.listStatus(cfStatus.getPath());
090        if (null == storeFileLists) {
091          continue;
092        }
093
094        for (FileStatus storeFile : storeFileLists) {
095          BlockLocation[] blkLocations = fs.getFileBlockLocations(storeFile, 0, storeFile.getLen());
096          if (null == blkLocations) {
097            continue;
098          }
099
100          totalBlkCount += blkLocations.length;
101          for (BlockLocation blk : blkLocations) {
102            for (String host : blk.getHosts()) {
103              AtomicInteger count = blockCountMap.get(host);
104              if (count == null) {
105                count = new AtomicInteger(0);
106                blockCountMap.put(host, count);
107              }
108              count.incrementAndGet();
109            }
110          }
111        }
112      }
113
114      if (regionToBestLocalityRSMapping != null) {
115        int largestBlkCount = 0;
116        String hostToRun = null;
117        for (Map.Entry<String, AtomicInteger> entry : blockCountMap.entrySet()) {
118          String host = entry.getKey();
119
120          int tmp = entry.getValue().get();
121          if (tmp > largestBlkCount) {
122            largestBlkCount = tmp;
123            hostToRun = host;
124          }
125        }
126
127        // empty regions could make this null
128        if (null == hostToRun) {
129          return;
130        }
131
132        if (hostToRun.endsWith(".")) {
133          hostToRun = hostToRun.substring(0, hostToRun.length() - 1);
134        }
135        String name = tableName + ":" + regionPath.getName();
136        synchronized (regionToBestLocalityRSMapping) {
137          regionToBestLocalityRSMapping.put(name, hostToRun);
138        }
139      }
140
141      if (regionDegreeLocalityMapping != null && totalBlkCount > 0) {
142        Map<String, Float> hostLocalityMap = new HashMap<>();
143        for (Map.Entry<String, AtomicInteger> entry : blockCountMap.entrySet()) {
144          String host = entry.getKey();
145          if (host.endsWith(".")) {
146            host = host.substring(0, host.length() - 1);
147          }
148          // Locality is fraction of blocks local to this host.
149          float locality = ((float) entry.getValue().get()) / totalBlkCount;
150          hostLocalityMap.put(host, locality);
151        }
152        // Put the locality map into the result map, keyed by the encoded name
153        // of the region.
154        regionDegreeLocalityMapping.put(regionPath.getName(), hostLocalityMap);
155      }
156    } catch (IOException e) {
157      LOG.warn("Problem scanning file system", e);
158    } catch (RuntimeException e) {
159      LOG.warn("Problem scanning file system", e);
160    }
161  }
162}