001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.util.HashMap; 022import java.util.Map; 023import java.util.concurrent.atomic.AtomicInteger; 024import org.apache.hadoop.fs.BlockLocation; 025import org.apache.hadoop.fs.FileStatus; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032/** 033 * Thread that walks over the filesystem, and computes the mappings Region -> BestHost and Region -> 034 * {@code Map<HostName, fractional-locality-of-region>} 035 */ 036@InterfaceAudience.Private 037class FSRegionScanner implements Runnable { 038 static private final Logger LOG = LoggerFactory.getLogger(FSRegionScanner.class); 039 040 private Path regionPath; 041 042 /** 043 * The file system used 044 */ 045 private FileSystem fs; 046 047 /** 048 * Maps each region to the RS with highest locality for that region. 049 */ 050 private final Map<String, String> regionToBestLocalityRSMapping; 051 052 /** 053 * Maps region encoded names to maps of hostnames to fractional locality of that region on that 054 * host. 055 */ 056 private Map<String, Map<String, Float>> regionDegreeLocalityMapping; 057 058 FSRegionScanner(FileSystem fs, Path regionPath, Map<String, String> regionToBestLocalityRSMapping, 059 Map<String, Map<String, Float>> regionDegreeLocalityMapping) { 060 this.fs = fs; 061 this.regionPath = regionPath; 062 this.regionToBestLocalityRSMapping = regionToBestLocalityRSMapping; 063 this.regionDegreeLocalityMapping = regionDegreeLocalityMapping; 064 } 065 066 @Override 067 public void run() { 068 try { 069 // empty the map for each region 070 Map<String, AtomicInteger> blockCountMap = new HashMap<>(); 071 072 // get table name 073 String tableName = regionPath.getParent().getName(); 074 int totalBlkCount = 0; 075 076 // ignore null 077 FileStatus[] cfList = fs.listStatus(regionPath, new FSUtils.FamilyDirFilter(fs)); 078 if (null == cfList) { 079 return; 080 } 081 082 // for each cf, get all the blocks information 083 for (FileStatus cfStatus : cfList) { 084 if (!cfStatus.isDirectory()) { 085 // skip because this is not a CF directory 086 continue; 087 } 088 089 FileStatus[] storeFileLists = fs.listStatus(cfStatus.getPath()); 090 if (null == storeFileLists) { 091 continue; 092 } 093 094 for (FileStatus storeFile : storeFileLists) { 095 BlockLocation[] blkLocations = fs.getFileBlockLocations(storeFile, 0, storeFile.getLen()); 096 if (null == blkLocations) { 097 continue; 098 } 099 100 totalBlkCount += blkLocations.length; 101 for (BlockLocation blk : blkLocations) { 102 for (String host : blk.getHosts()) { 103 AtomicInteger count = blockCountMap.get(host); 104 if (count == null) { 105 count = new AtomicInteger(0); 106 blockCountMap.put(host, count); 107 } 108 count.incrementAndGet(); 109 } 110 } 111 } 112 } 113 114 if (regionToBestLocalityRSMapping != null) { 115 int largestBlkCount = 0; 116 String hostToRun = null; 117 for (Map.Entry<String, AtomicInteger> entry : blockCountMap.entrySet()) { 118 String host = entry.getKey(); 119 120 int tmp = entry.getValue().get(); 121 if (tmp > largestBlkCount) { 122 largestBlkCount = tmp; 123 hostToRun = host; 124 } 125 } 126 127 // empty regions could make this null 128 if (null == hostToRun) { 129 return; 130 } 131 132 if (hostToRun.endsWith(".")) { 133 hostToRun = hostToRun.substring(0, hostToRun.length() - 1); 134 } 135 String name = tableName + ":" + regionPath.getName(); 136 synchronized (regionToBestLocalityRSMapping) { 137 regionToBestLocalityRSMapping.put(name, hostToRun); 138 } 139 } 140 141 if (regionDegreeLocalityMapping != null && totalBlkCount > 0) { 142 Map<String, Float> hostLocalityMap = new HashMap<>(); 143 for (Map.Entry<String, AtomicInteger> entry : blockCountMap.entrySet()) { 144 String host = entry.getKey(); 145 if (host.endsWith(".")) { 146 host = host.substring(0, host.length() - 1); 147 } 148 // Locality is fraction of blocks local to this host. 149 float locality = ((float) entry.getValue().get()) / totalBlkCount; 150 hostLocalityMap.put(host, locality); 151 } 152 // Put the locality map into the result map, keyed by the encoded name 153 // of the region. 154 regionDegreeLocalityMapping.put(regionPath.getName(), hostLocalityMap); 155 } 156 } catch (IOException e) { 157 LOG.warn("Problem scanning file system", e); 158 } catch (RuntimeException e) { 159 LOG.warn("Problem scanning file system", e); 160 } 161 } 162}