001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.util; 020 021import java.io.IOException; 022import java.util.HashMap; 023import java.util.Map; 024 025import java.util.concurrent.atomic.AtomicInteger; 026 027import org.apache.yetus.audience.InterfaceAudience; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030import org.apache.hadoop.fs.BlockLocation; 031import org.apache.hadoop.fs.FileStatus; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.util.FSUtils; 035 036/** 037 * Thread that walks over the filesystem, and computes the mappings 038 * Region -> BestHost and Region -> {@code Map<HostName, fractional-locality-of-region>} 039 * 040 */ 041@InterfaceAudience.Private 042class FSRegionScanner implements Runnable { 043 static private final Logger LOG = LoggerFactory.getLogger(FSRegionScanner.class); 044 045 private Path regionPath; 046 047 /** 048 * The file system used 049 */ 050 private FileSystem fs; 051 052 /** 053 * Maps each region to the RS with highest locality for that region. 054 */ 055 private final Map<String,String> regionToBestLocalityRSMapping; 056 057 /** 058 * Maps region encoded names to maps of hostnames to fractional locality of 059 * that region on that host. 060 */ 061 private Map<String, Map<String, Float>> regionDegreeLocalityMapping; 062 063 FSRegionScanner(FileSystem fs, Path regionPath, 064 Map<String, String> regionToBestLocalityRSMapping, 065 Map<String, Map<String, Float>> regionDegreeLocalityMapping) { 066 this.fs = fs; 067 this.regionPath = regionPath; 068 this.regionToBestLocalityRSMapping = regionToBestLocalityRSMapping; 069 this.regionDegreeLocalityMapping = regionDegreeLocalityMapping; 070 } 071 072 @Override 073 public void run() { 074 try { 075 // empty the map for each region 076 Map<String, AtomicInteger> blockCountMap = new HashMap<>(); 077 078 //get table name 079 String tableName = regionPath.getParent().getName(); 080 int totalBlkCount = 0; 081 082 // ignore null 083 FileStatus[] cfList = fs.listStatus(regionPath, new FSUtils.FamilyDirFilter(fs)); 084 if (null == cfList) { 085 return; 086 } 087 088 // for each cf, get all the blocks information 089 for (FileStatus cfStatus : cfList) { 090 if (!cfStatus.isDirectory()) { 091 // skip because this is not a CF directory 092 continue; 093 } 094 095 FileStatus[] storeFileLists = fs.listStatus(cfStatus.getPath()); 096 if (null == storeFileLists) { 097 continue; 098 } 099 100 for (FileStatus storeFile : storeFileLists) { 101 BlockLocation[] blkLocations = 102 fs.getFileBlockLocations(storeFile, 0, storeFile.getLen()); 103 if (null == blkLocations) { 104 continue; 105 } 106 107 totalBlkCount += blkLocations.length; 108 for(BlockLocation blk: blkLocations) { 109 for (String host: blk.getHosts()) { 110 AtomicInteger count = blockCountMap.get(host); 111 if (count == null) { 112 count = new AtomicInteger(0); 113 blockCountMap.put(host, count); 114 } 115 count.incrementAndGet(); 116 } 117 } 118 } 119 } 120 121 if (regionToBestLocalityRSMapping != null) { 122 int largestBlkCount = 0; 123 String hostToRun = null; 124 for (Map.Entry<String, AtomicInteger> entry : blockCountMap.entrySet()) { 125 String host = entry.getKey(); 126 127 int tmp = entry.getValue().get(); 128 if (tmp > largestBlkCount) { 129 largestBlkCount = tmp; 130 hostToRun = host; 131 } 132 } 133 134 // empty regions could make this null 135 if (null == hostToRun) { 136 return; 137 } 138 139 if (hostToRun.endsWith(".")) { 140 hostToRun = hostToRun.substring(0, hostToRun.length()-1); 141 } 142 String name = tableName + ":" + regionPath.getName(); 143 synchronized (regionToBestLocalityRSMapping) { 144 regionToBestLocalityRSMapping.put(name, hostToRun); 145 } 146 } 147 148 if (regionDegreeLocalityMapping != null && totalBlkCount > 0) { 149 Map<String, Float> hostLocalityMap = new HashMap<>(); 150 for (Map.Entry<String, AtomicInteger> entry : blockCountMap.entrySet()) { 151 String host = entry.getKey(); 152 if (host.endsWith(".")) { 153 host = host.substring(0, host.length() - 1); 154 } 155 // Locality is fraction of blocks local to this host. 156 float locality = ((float)entry.getValue().get()) / totalBlkCount; 157 hostLocalityMap.put(host, locality); 158 } 159 // Put the locality map into the result map, keyed by the encoded name 160 // of the region. 161 regionDegreeLocalityMapping.put(regionPath.getName(), hostLocalityMap); 162 } 163 } catch (IOException e) { 164 LOG.warn("Problem scanning file system", e); 165 } catch (RuntimeException e) { 166 LOG.warn("Problem scanning file system", e); 167 } 168 } 169}