001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver.compactions;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.List;
025import java.util.OptionalLong;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.regionserver.HStore;
030import org.apache.hadoop.hbase.regionserver.HStoreFile;
031import org.apache.hadoop.hbase.regionserver.RSRpcServices;
032import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
033import org.apache.hadoop.hbase.regionserver.StoreUtils;
034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/**
040 * The default algorithm for selecting files for compaction.
041 * Combines the compaction configuration and the provisional file selection that
042 * it's given to produce the list of suitable candidates for compaction.
043 */
044@InterfaceAudience.Private
045public class RatioBasedCompactionPolicy extends SortedCompactionPolicy {
046  private static final Logger LOG = LoggerFactory.getLogger(RatioBasedCompactionPolicy.class);
047
048  public RatioBasedCompactionPolicy(Configuration conf,
049                                    StoreConfigInformation storeConfigInfo) {
050    super(conf, storeConfigInfo);
051  }
052
053  /*
054   * @param filesToCompact Files to compact. Can be null.
055   * @return True if we should run a major compaction.
056   */
057  @Override
058  public boolean shouldPerformMajorCompaction(Collection<HStoreFile> filesToCompact)
059    throws IOException {
060    boolean result = false;
061    long mcTime = getNextMajorCompactTime(filesToCompact);
062    if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
063      return result;
064    }
065    // TODO: Use better method for determining stamp of last major (HBASE-2990)
066    long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
067    long now = EnvironmentEdgeManager.currentTime();
068    if (lowTimestamp > 0L && lowTimestamp < (now - mcTime)) {
069      String regionInfo;
070      if (this.storeConfigInfo != null && this.storeConfigInfo instanceof HStore) {
071        regionInfo = ((HStore)this.storeConfigInfo).getRegionInfo().getRegionNameAsString();
072      } else {
073        regionInfo = this.toString();
074      }
075      // Major compaction time has elapsed.
076      long cfTTL = HConstants.FOREVER;
077      if (this.storeConfigInfo != null) {
078         cfTTL = this.storeConfigInfo.getStoreFileTtl();
079      }
080      if (filesToCompact.size() == 1) {
081        // Single file
082        HStoreFile sf = filesToCompact.iterator().next();
083        OptionalLong minTimestamp = sf.getMinimumTimestamp();
084        long oldest = minTimestamp.isPresent() ? now - minTimestamp.getAsLong() : Long.MIN_VALUE;
085        if (sf.isMajorCompactionResult() && (cfTTL == Long.MAX_VALUE || oldest < cfTTL)) {
086          float blockLocalityIndex =
087            sf.getHDFSBlockDistribution().getBlockLocalityIndex(
088            RSRpcServices.getHostname(comConf.conf, false));
089          if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
090            LOG.debug("Major compaction triggered on only store " + regionInfo
091              + "; to make hdfs blocks local, current blockLocalityIndex is "
092              + blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + ")");
093            result = true;
094          } else {
095            LOG.debug("Skipping major compaction of " + regionInfo
096              + " because one (major) compacted file only, oldestTime " + oldest
097              + "ms is < TTL=" + cfTTL + " and blockLocalityIndex is " + blockLocalityIndex
098              + " (min " + comConf.getMinLocalityToForceCompact() + ")");
099          }
100        } else if (cfTTL != HConstants.FOREVER && oldest > cfTTL) {
101          LOG.debug("Major compaction triggered on store " + regionInfo
102            + ", because keyvalues outdated; time since last major compaction "
103            + (now - lowTimestamp) + "ms");
104          result = true;
105        }
106      } else {
107        LOG.debug("Major compaction triggered on store " + regionInfo
108          + "; time since last major compaction " + (now - lowTimestamp) + "ms");
109        result = true;
110      }
111    }
112    return result;
113  }
114
115  @Override
116  protected CompactionRequestImpl createCompactionRequest(ArrayList<HStoreFile>
117    candidateSelection, boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck)
118    throws IOException {
119    if (!tryingMajor) {
120      candidateSelection = filterBulk(candidateSelection);
121      candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
122      candidateSelection = checkMinFilesCriteria(candidateSelection,
123        comConf.getMinFilesToCompact());
124    }
125    return new CompactionRequestImpl(candidateSelection);
126  }
127
128  /**
129    * -- Default minor compaction selection algorithm:
130    * choose CompactSelection from candidates --
131    * First exclude bulk-load files if indicated in configuration.
132    * Start at the oldest file and stop when you find the first file that
133    * meets compaction criteria:
134    * (1) a recently-flushed, small file (i.e. <= minCompactSize)
135    * OR
136    * (2) within the compactRatio of sum(newer_files)
137    * Given normal skew, any newer files will also meet this criteria
138    * <p/>
139    * Additional Note:
140    * If fileSizes.size() >> maxFilesToCompact, we will recurse on
141    * compact().  Consider the oldest files first to avoid a
142    * situation where we always compact [end-threshold,end).  Then, the
143    * last file becomes an aggregate of the previous compactions.
144    *
145    * normal skew:
146    *
147    *         older ----> newer (increasing seqID)
148    *     _
149    *    | |   _
150    *    | |  | |   _
151    *  --|-|- |-|- |-|---_-------_-------  minCompactSize
152    *    | |  | |  | |  | |  _  | |
153    *    | |  | |  | |  | | | | | |
154    *    | |  | |  | |  | | | | | |
155    * @param candidates pre-filtrate
156    * @return filtered subset
157    */
158  protected ArrayList<HStoreFile> applyCompactionPolicy(ArrayList<HStoreFile> candidates,
159    boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
160    if (candidates.isEmpty()) {
161      return candidates;
162    }
163
164    // we're doing a minor compaction, let's see what files are applicable
165    int start = 0;
166    double ratio = comConf.getCompactionRatio();
167    if (mayUseOffPeak) {
168      ratio = comConf.getCompactionRatioOffPeak();
169      LOG.info("Running an off-peak compaction, selection ratio = " + ratio);
170    }
171
172    // get store file sizes for incremental compacting selection.
173    final int countOfFiles = candidates.size();
174    long[] fileSizes = new long[countOfFiles];
175    long[] sumSize = new long[countOfFiles];
176    for (int i = countOfFiles - 1; i >= 0; --i) {
177      HStoreFile file = candidates.get(i);
178      fileSizes[i] = file.getReader().length();
179      // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
180      int tooFar = i + comConf.getMaxFilesToCompact() - 1;
181      sumSize[i] = fileSizes[i]
182        + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
183        - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
184    }
185
186
187    while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
188      fileSizes[start] > Math.max(comConf.getMinCompactSize(),
189          (long) (sumSize[start + 1] * ratio))) {
190      ++start;
191    }
192    if (start < countOfFiles) {
193      LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
194        + " files from " + countOfFiles + " candidates");
195    } else if (mayBeStuck) {
196      // We may be stuck. Compact the latest files if we can.
197      int filesToLeave = candidates.size() - comConf.getMinFilesToCompact();
198      if (filesToLeave >= 0) {
199        start = filesToLeave;
200      }
201    }
202    candidates.subList(0, start).clear();
203    return candidates;
204  }
205
206  /**
207   * A heuristic method to decide whether to schedule a compaction request
208   * @param storeFiles files in the store.
209   * @param filesCompacting files being scheduled to compact.
210   * @return true to schedule a request.
211   */
212  @Override
213  public boolean needsCompaction(Collection<HStoreFile> storeFiles,
214      List<HStoreFile> filesCompacting) {
215    int numCandidates = storeFiles.size() - filesCompacting.size();
216    return numCandidates >= comConf.getMinFilesToCompact();
217  }
218
219  /**
220   * Overwrite min threshold for compaction
221   */
222  public void setMinThreshold(int minThreshold) {
223    comConf.setMinFilesToCompact(minThreshold);
224  }
225}