001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to you under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.hadoop.hbase.quotas; 018 019import java.util.HashMap; 020import java.util.HashSet; 021import java.util.Iterator; 022import java.util.Map; 023import java.util.Set; 024import java.util.concurrent.TimeUnit; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.ScheduledChore; 028import org.apache.hadoop.hbase.client.RegionInfo; 029import org.apache.hadoop.hbase.regionserver.HRegion; 030import org.apache.hadoop.hbase.regionserver.HRegionServer; 031import org.apache.hadoop.hbase.regionserver.Region; 032import org.apache.hadoop.hbase.regionserver.Store; 033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * A chore which computes the size of each {@link HRegion} on the FileSystem hosted by the given {@link HRegionServer}. 040 */ 041@InterfaceAudience.Private 042public class FileSystemUtilizationChore extends ScheduledChore { 043 private static final Logger LOG = LoggerFactory.getLogger(FileSystemUtilizationChore.class); 044 static final String FS_UTILIZATION_CHORE_PERIOD_KEY = "hbase.regionserver.quotas.fs.utilization.chore.period"; 045 static final int FS_UTILIZATION_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis 046 047 static final String FS_UTILIZATION_CHORE_DELAY_KEY = "hbase.regionserver.quotas.fs.utilization.chore.delay"; 048 static final long FS_UTILIZATION_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute 049 050 static final String FS_UTILIZATION_CHORE_TIMEUNIT_KEY = "hbase.regionserver.quotas.fs.utilization.chore.timeunit"; 051 static final String FS_UTILIZATION_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name(); 052 053 static final String FS_UTILIZATION_MAX_ITERATION_DURATION_KEY = "hbase.regionserver.quotas.fs.utilization.chore.max.iteration.millis"; 054 static final long FS_UTILIZATION_MAX_ITERATION_DURATION_DEFAULT = 5000L; 055 056 private int numberOfCyclesToSkip = 0, prevNumberOfCyclesToSkip = 0; 057 private static final int CYCLE_UPPER_BOUND = 32; 058 059 private final HRegionServer rs; 060 private final long maxIterationMillis; 061 private Iterator<Region> leftoverRegions; 062 063 public FileSystemUtilizationChore(HRegionServer rs) { 064 super(FileSystemUtilizationChore.class.getSimpleName(), rs, getPeriod(rs.getConfiguration()), 065 getInitialDelay(rs.getConfiguration()), getTimeUnit(rs.getConfiguration())); 066 this.rs = rs; 067 this.maxIterationMillis = rs.getConfiguration().getLong( 068 FS_UTILIZATION_MAX_ITERATION_DURATION_KEY, FS_UTILIZATION_MAX_ITERATION_DURATION_DEFAULT); 069 } 070 071 @Override 072 protected void chore() { 073 if (numberOfCyclesToSkip > 0) { 074 numberOfCyclesToSkip--; 075 return; 076 } 077 final Map<RegionInfo, Long> onlineRegionSizes = new HashMap<>(); 078 final Set<Region> onlineRegions = new HashSet<>(rs.getRegions()); 079 // Process the regions from the last run if we have any. If we are somehow having difficulty 080 // processing the Regions, we want to avoid creating a backlog in memory of Region objs. 081 Iterator<Region> oldRegionsToProcess = getLeftoverRegions(); 082 final Iterator<Region> iterator; 083 final boolean processingLeftovers; 084 if (oldRegionsToProcess == null) { 085 iterator = onlineRegions.iterator(); 086 processingLeftovers = false; 087 } else { 088 iterator = oldRegionsToProcess; 089 processingLeftovers = true; 090 } 091 // Reset the leftoverRegions and let the loop re-assign if necessary. 092 setLeftoverRegions(null); 093 long regionSizesCalculated = 0L; 094 long offlineRegionsSkipped = 0L; 095 long skippedSplitParents = 0L; 096 long skippedRegionReplicas = 0L; 097 final long start = EnvironmentEdgeManager.currentTime(); 098 while (iterator.hasNext()) { 099 // Make sure this chore doesn't hog the thread. 100 long timeRunning = EnvironmentEdgeManager.currentTime() - start; 101 if (timeRunning > maxIterationMillis) { 102 LOG.debug("Preempting execution of FileSystemUtilizationChore because it exceeds the" 103 + " maximum iteration configuration value. Will process remaining iterators" 104 + " on a subsequent invocation."); 105 setLeftoverRegions(iterator); 106 break; 107 } 108 109 final Region region = iterator.next(); 110 // If we're processing leftover regions, the region may no-longer be online. 111 // If so, we can skip it. 112 if (processingLeftovers && !onlineRegions.contains(region)) { 113 offlineRegionsSkipped++; 114 continue; 115 } 116 // Avoid computing the size of regions which are the parent of split. 117 if (region.getRegionInfo().isSplitParent()) { 118 skippedSplitParents++; 119 continue; 120 } 121 // Avoid computing the size of region replicas. 122 if (RegionInfo.DEFAULT_REPLICA_ID != region.getRegionInfo().getReplicaId()) { 123 skippedRegionReplicas++; 124 continue; 125 } 126 final long sizeInBytes = computeSize(region); 127 onlineRegionSizes.put(region.getRegionInfo(), sizeInBytes); 128 regionSizesCalculated++; 129 } 130 if (LOG.isTraceEnabled()) { 131 LOG.trace("Computed the size of " + regionSizesCalculated + " Regions. Skipped computation" 132 + " of " + offlineRegionsSkipped + " regions due to not being online on this RS, " 133 + skippedSplitParents + " regions due to being the parent of a split, and" 134 + skippedRegionReplicas + " regions due to being region replicas."); 135 } 136 if (!reportRegionSizesToMaster(onlineRegionSizes)) { 137 // backoff reporting 138 numberOfCyclesToSkip = prevNumberOfCyclesToSkip > 0 ? 2 * prevNumberOfCyclesToSkip : 1; 139 if (numberOfCyclesToSkip > CYCLE_UPPER_BOUND) { 140 numberOfCyclesToSkip = CYCLE_UPPER_BOUND; 141 } 142 prevNumberOfCyclesToSkip = numberOfCyclesToSkip; 143 } 144 } 145 146 /** 147 * Returns an {@link Iterator} over the Regions which were skipped last invocation of the chore. 148 * 149 * @return Regions from the previous invocation to process, or null. 150 */ 151 Iterator<Region> getLeftoverRegions() { 152 return leftoverRegions; 153 } 154 155 /** 156 * Sets a new collection of Regions as leftovers. 157 */ 158 void setLeftoverRegions(Iterator<Region> newLeftovers) { 159 this.leftoverRegions = newLeftovers; 160 } 161 162 /** 163 * Computes total FileSystem size for the given {@link Region}. 164 * 165 * @param r The region 166 * @return The size, in bytes, of the Region. 167 */ 168 long computeSize(Region r) { 169 long regionSize = 0L; 170 for (Store store : r.getStores()) { 171 regionSize += store.getHFilesSize(); 172 } 173 if (LOG.isTraceEnabled()) { 174 LOG.trace("Size of " + r + " is " + regionSize); 175 } 176 return regionSize; 177 } 178 179 /** 180 * Reports the computed region sizes to the currently active Master. 181 * 182 * @param onlineRegionSizes The computed region sizes to report. 183 * @return {@code false} if FileSystemUtilizationChore should pause reporting to master, 184 * {@code true} otherwise. 185 */ 186 boolean reportRegionSizesToMaster(Map<RegionInfo,Long> onlineRegionSizes) { 187 return this.rs.reportRegionSizesForQuotas(onlineRegionSizes); 188 } 189 190 /** 191 * Extracts the period for the chore from the configuration. 192 * 193 * @param conf The configuration object. 194 * @return The configured chore period or the default value. 195 */ 196 static int getPeriod(Configuration conf) { 197 return conf.getInt(FS_UTILIZATION_CHORE_PERIOD_KEY, FS_UTILIZATION_CHORE_PERIOD_DEFAULT); 198 } 199 200 /** 201 * Extracts the initial delay for the chore from the configuration. 202 * 203 * @param conf The configuration object. 204 * @return The configured chore initial delay or the default value. 205 */ 206 static long getInitialDelay(Configuration conf) { 207 return conf.getLong(FS_UTILIZATION_CHORE_DELAY_KEY, FS_UTILIZATION_CHORE_DELAY_DEFAULT); 208 } 209 210 /** 211 * Extracts the time unit for the chore period and initial delay from the configuration. The 212 * configuration value for {@link #FS_UTILIZATION_CHORE_TIMEUNIT_KEY} must correspond to a 213 * {@link TimeUnit} value. 214 * 215 * @param conf The configuration object. 216 * @return The configured time unit for the chore period and initial delay or the default value. 217 */ 218 static TimeUnit getTimeUnit(Configuration conf) { 219 return TimeUnit.valueOf(conf.get(FS_UTILIZATION_CHORE_TIMEUNIT_KEY, 220 FS_UTILIZATION_CHORE_TIMEUNIT_DEFAULT)); 221 } 222}