001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to you under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.hadoop.hbase.quotas;
018
019import java.util.HashMap;
020import java.util.HashSet;
021import java.util.Iterator;
022import java.util.Map;
023import java.util.Set;
024import java.util.concurrent.TimeUnit;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.ScheduledChore;
028import org.apache.hadoop.hbase.client.RegionInfo;
029import org.apache.hadoop.hbase.regionserver.HRegion;
030import org.apache.hadoop.hbase.regionserver.HRegionServer;
031import org.apache.hadoop.hbase.regionserver.Region;
032import org.apache.hadoop.hbase.regionserver.Store;
033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * A chore which computes the size of each {@link HRegion} on the FileSystem hosted by the given {@link HRegionServer}.
040 */
041@InterfaceAudience.Private
042public class FileSystemUtilizationChore extends ScheduledChore {
043  private static final Logger LOG = LoggerFactory.getLogger(FileSystemUtilizationChore.class);
044  static final String FS_UTILIZATION_CHORE_PERIOD_KEY = "hbase.regionserver.quotas.fs.utilization.chore.period";
045  static final int FS_UTILIZATION_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis
046
047  static final String FS_UTILIZATION_CHORE_DELAY_KEY = "hbase.regionserver.quotas.fs.utilization.chore.delay";
048  static final long FS_UTILIZATION_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute
049
050  static final String FS_UTILIZATION_CHORE_TIMEUNIT_KEY = "hbase.regionserver.quotas.fs.utilization.chore.timeunit";
051  static final String FS_UTILIZATION_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
052
053  static final String FS_UTILIZATION_MAX_ITERATION_DURATION_KEY = "hbase.regionserver.quotas.fs.utilization.chore.max.iteration.millis";
054  static final long FS_UTILIZATION_MAX_ITERATION_DURATION_DEFAULT = 5000L;
055
056  private int numberOfCyclesToSkip = 0, prevNumberOfCyclesToSkip = 0;
057  private static final int CYCLE_UPPER_BOUND = 32;
058
059  private final HRegionServer rs;
060  private final long maxIterationMillis;
061  private Iterator<Region> leftoverRegions;
062
063  public FileSystemUtilizationChore(HRegionServer rs) {
064    super(FileSystemUtilizationChore.class.getSimpleName(), rs, getPeriod(rs.getConfiguration()),
065        getInitialDelay(rs.getConfiguration()), getTimeUnit(rs.getConfiguration()));
066    this.rs = rs;
067    this.maxIterationMillis = rs.getConfiguration().getLong(
068        FS_UTILIZATION_MAX_ITERATION_DURATION_KEY, FS_UTILIZATION_MAX_ITERATION_DURATION_DEFAULT);
069  }
070
071  @Override
072  protected void chore() {
073    if (numberOfCyclesToSkip > 0) {
074      numberOfCyclesToSkip--;
075      return;
076    }
077    final Map<RegionInfo, Long> onlineRegionSizes = new HashMap<>();
078    final Set<Region> onlineRegions = new HashSet<>(rs.getRegions());
079    // Process the regions from the last run if we have any. If we are somehow having difficulty
080    // processing the Regions, we want to avoid creating a backlog in memory of Region objs.
081    Iterator<Region> oldRegionsToProcess = getLeftoverRegions();
082    final Iterator<Region> iterator;
083    final boolean processingLeftovers;
084    if (oldRegionsToProcess == null) {
085      iterator = onlineRegions.iterator();
086      processingLeftovers = false;
087    } else {
088      iterator = oldRegionsToProcess;
089      processingLeftovers = true;
090    }
091    // Reset the leftoverRegions and let the loop re-assign if necessary.
092    setLeftoverRegions(null);
093    long regionSizesCalculated = 0L;
094    long offlineRegionsSkipped = 0L;
095    long skippedSplitParents = 0L;
096    long skippedRegionReplicas = 0L;
097    final long start = EnvironmentEdgeManager.currentTime();
098    while (iterator.hasNext()) {
099      // Make sure this chore doesn't hog the thread.
100      long timeRunning = EnvironmentEdgeManager.currentTime() - start;
101      if (timeRunning > maxIterationMillis) {
102        LOG.debug("Preempting execution of FileSystemUtilizationChore because it exceeds the"
103            + " maximum iteration configuration value. Will process remaining iterators"
104            + " on a subsequent invocation.");
105        setLeftoverRegions(iterator);
106        break;
107      }
108
109      final Region region = iterator.next();
110      // If we're processing leftover regions, the region may no-longer be online.
111      // If so, we can skip it.
112      if (processingLeftovers && !onlineRegions.contains(region)) {
113        offlineRegionsSkipped++;
114        continue;
115      }
116      // Avoid computing the size of regions which are the parent of split.
117      if (region.getRegionInfo().isSplitParent()) {
118        skippedSplitParents++;
119        continue;
120      }
121      // Avoid computing the size of region replicas.
122      if (RegionInfo.DEFAULT_REPLICA_ID != region.getRegionInfo().getReplicaId()) {
123        skippedRegionReplicas++;
124        continue;
125      }
126      final long sizeInBytes = computeSize(region);
127      onlineRegionSizes.put(region.getRegionInfo(), sizeInBytes);
128      regionSizesCalculated++;
129    }
130    if (LOG.isTraceEnabled()) {
131      LOG.trace("Computed the size of " + regionSizesCalculated + " Regions. Skipped computation"
132          + " of " + offlineRegionsSkipped + " regions due to not being online on this RS, "
133          + skippedSplitParents + " regions due to being the parent of a split, and"
134          + skippedRegionReplicas + " regions due to being region replicas.");
135    }
136    if (!reportRegionSizesToMaster(onlineRegionSizes)) {
137      // backoff reporting
138      numberOfCyclesToSkip = prevNumberOfCyclesToSkip > 0 ? 2 * prevNumberOfCyclesToSkip : 1;
139      if (numberOfCyclesToSkip > CYCLE_UPPER_BOUND) {
140        numberOfCyclesToSkip = CYCLE_UPPER_BOUND;
141      }
142      prevNumberOfCyclesToSkip = numberOfCyclesToSkip;
143    }
144  }
145
146  /**
147   * Returns an {@link Iterator} over the Regions which were skipped last invocation of the chore.
148   *
149   * @return Regions from the previous invocation to process, or null.
150   */
151  Iterator<Region> getLeftoverRegions() {
152    return leftoverRegions;
153  }
154
155  /**
156   * Sets a new collection of Regions as leftovers.
157   */
158  void setLeftoverRegions(Iterator<Region> newLeftovers) {
159    this.leftoverRegions = newLeftovers;
160  }
161
162  /**
163   * Computes total FileSystem size for the given {@link Region}.
164   *
165   * @param r The region
166   * @return The size, in bytes, of the Region.
167   */
168  long computeSize(Region r) {
169    long regionSize = 0L;
170    for (Store store : r.getStores()) {
171      regionSize += store.getHFilesSize();
172    }
173    if (LOG.isTraceEnabled()) {
174      LOG.trace("Size of " + r + " is " + regionSize);
175    }
176    return regionSize;
177  }
178
179  /**
180   * Reports the computed region sizes to the currently active Master.
181   *
182   * @param onlineRegionSizes The computed region sizes to report.
183   * @return {@code false} if FileSystemUtilizationChore should pause reporting to master,
184   *    {@code true} otherwise.
185   */
186  boolean reportRegionSizesToMaster(Map<RegionInfo,Long> onlineRegionSizes) {
187    return this.rs.reportRegionSizesForQuotas(onlineRegionSizes);
188  }
189
190  /**
191   * Extracts the period for the chore from the configuration.
192   *
193   * @param conf The configuration object.
194   * @return The configured chore period or the default value.
195   */
196  static int getPeriod(Configuration conf) {
197    return conf.getInt(FS_UTILIZATION_CHORE_PERIOD_KEY, FS_UTILIZATION_CHORE_PERIOD_DEFAULT);
198  }
199
200  /**
201   * Extracts the initial delay for the chore from the configuration.
202   *
203   * @param conf The configuration object.
204   * @return The configured chore initial delay or the default value.
205   */
206  static long getInitialDelay(Configuration conf) {
207    return conf.getLong(FS_UTILIZATION_CHORE_DELAY_KEY, FS_UTILIZATION_CHORE_DELAY_DEFAULT);
208  }
209
210  /**
211   * Extracts the time unit for the chore period and initial delay from the configuration. The
212   * configuration value for {@link #FS_UTILIZATION_CHORE_TIMEUNIT_KEY} must correspond to a
213   * {@link TimeUnit} value.
214   *
215   * @param conf The configuration object.
216   * @return The configured time unit for the chore period and initial delay or the default value.
217   */
218  static TimeUnit getTimeUnit(Configuration conf) {
219    return TimeUnit.valueOf(conf.get(FS_UTILIZATION_CHORE_TIMEUNIT_KEY,
220        FS_UTILIZATION_CHORE_TIMEUNIT_DEFAULT));
221  }
222}