001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.List;
024import java.util.OptionalLong;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.HConstants;
027import org.apache.hadoop.hbase.regionserver.HStore;
028import org.apache.hadoop.hbase.regionserver.HStoreFile;
029import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
030import org.apache.hadoop.hbase.regionserver.StoreUtils;
031import org.apache.hadoop.hbase.util.DNS;
032import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037/**
038 * The default algorithm for selecting files for compaction. Combines the compaction configuration
039 * and the provisional file selection that it's given to produce the list of suitable candidates for
040 * compaction.
041 */
042@InterfaceAudience.Private
043public class RatioBasedCompactionPolicy extends SortedCompactionPolicy {
044  private static final Logger LOG = LoggerFactory.getLogger(RatioBasedCompactionPolicy.class);
045
046  public RatioBasedCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) {
047    super(conf, storeConfigInfo);
048  }
049
050  /*
051   * @param filesToCompact Files to compact. Can be null.
052   * @return True if we should run a major compaction.
053   */
054  @Override
055  public boolean shouldPerformMajorCompaction(Collection<HStoreFile> filesToCompact)
056    throws IOException {
057    boolean result = false;
058    long mcTime = getNextMajorCompactTime(filesToCompact);
059    if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
060      return result;
061    }
062    // TODO: Use better method for determining stamp of last major (HBASE-2990)
063    long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
064    long now = EnvironmentEdgeManager.currentTime();
065    if (lowTimestamp > 0L && lowTimestamp < (now - mcTime)) {
066      String regionInfo;
067      if (this.storeConfigInfo != null && this.storeConfigInfo instanceof HStore) {
068        regionInfo = ((HStore) this.storeConfigInfo).getRegionInfo().getRegionNameAsString();
069      } else {
070        regionInfo = this.toString();
071      }
072      // Major compaction time has elapsed.
073      long cfTTL = HConstants.FOREVER;
074      if (this.storeConfigInfo != null) {
075        cfTTL = this.storeConfigInfo.getStoreFileTtl();
076      }
077      if (filesToCompact.size() == 1) {
078        // Single file
079        HStoreFile sf = filesToCompact.iterator().next();
080        OptionalLong minTimestamp = sf.getMinimumTimestamp();
081        long oldest = minTimestamp.isPresent() ? now - minTimestamp.getAsLong() : Long.MIN_VALUE;
082        if (sf.isMajorCompactionResult() && (cfTTL == Long.MAX_VALUE || oldest < cfTTL)) {
083          float blockLocalityIndex = sf.getHDFSBlockDistribution()
084            .getBlockLocalityIndex(DNS.getHostname(comConf.conf, DNS.ServerType.REGIONSERVER));
085          if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
086            LOG.debug("Major compaction triggered on only store " + regionInfo
087              + "; to make hdfs blocks local, current blockLocalityIndex is " + blockLocalityIndex
088              + " (min " + comConf.getMinLocalityToForceCompact() + ")");
089            result = true;
090          } else {
091            LOG.debug("Skipping major compaction of " + regionInfo
092              + " because one (major) compacted file only, oldestTime " + oldest + "ms is < TTL="
093              + cfTTL + " and blockLocalityIndex is " + blockLocalityIndex + " (min "
094              + comConf.getMinLocalityToForceCompact() + ")");
095          }
096        } else if (cfTTL != HConstants.FOREVER && oldest > cfTTL) {
097          LOG.debug("Major compaction triggered on store " + regionInfo
098            + ", because keyvalues outdated; time since last major compaction "
099            + (now - lowTimestamp) + "ms");
100          result = true;
101        }
102      } else {
103        LOG.debug("Major compaction triggered on store " + regionInfo
104          + "; time since last major compaction " + (now - lowTimestamp) + "ms");
105        result = true;
106      }
107    }
108    return result;
109  }
110
111  @Override
112  protected CompactionRequestImpl createCompactionRequest(ArrayList<HStoreFile> candidateSelection,
113    boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
114    if (!tryingMajor) {
115      filterBulk(candidateSelection);
116      candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
117      candidateSelection =
118        checkMinFilesCriteria(candidateSelection, comConf.getMinFilesToCompact());
119    }
120    return new CompactionRequestImpl(candidateSelection);
121  }
122
123  /**
124   * -- Default minor compaction selection algorithm: choose CompactSelection from candidates --
125   * First exclude bulk-load files if indicated in configuration. Start at the oldest file and stop
126   * when you find the first file that meets compaction criteria: (1) a recently-flushed, small file
127   * (i.e. <= minCompactSize) OR (2) within the compactRatio of sum(newer_files) Given normal skew,
128   * any newer files will also meet this criteria
129   * <p/>
130   * Additional Note: If fileSizes.size() >> maxFilesToCompact, we will recurse on compact().
131   * Consider the oldest files first to avoid a situation where we always compact
132   * [end-threshold,end). Then, the last file becomes an aggregate of the previous compactions.
133   * normal skew: older ----> newer (increasing seqID) _ | | _ | | | | _ --|-|- |-|-
134   * |-|---_-------_------- minCompactSize | | | | | | | | _ | | | | | | | | | | | | | | | | | | | |
135   * | | | | | |
136   * @param candidates pre-filtrate
137   * @return filtered subset
138   */
139  protected ArrayList<HStoreFile> applyCompactionPolicy(ArrayList<HStoreFile> candidates,
140    boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
141    if (candidates.isEmpty()) {
142      return candidates;
143    }
144
145    // we're doing a minor compaction, let's see what files are applicable
146    int start = 0;
147    double ratio = comConf.getCompactionRatio();
148    if (mayUseOffPeak) {
149      ratio = comConf.getCompactionRatioOffPeak();
150      LOG.info("Running an off-peak compaction, selection ratio = " + ratio);
151    }
152
153    // get store file sizes for incremental compacting selection.
154    final int countOfFiles = candidates.size();
155    long[] fileSizes = new long[countOfFiles];
156    long[] sumSize = new long[countOfFiles];
157    for (int i = countOfFiles - 1; i >= 0; --i) {
158      HStoreFile file = candidates.get(i);
159      fileSizes[i] = file.getReader().length();
160      // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
161      int tooFar = i + comConf.getMaxFilesToCompact() - 1;
162      sumSize[i] = fileSizes[i] + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
163        - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
164    }
165
166    while (
167      countOfFiles - start >= comConf.getMinFilesToCompact() && fileSizes[start]
168          > Math.max(comConf.getMinCompactSize(), (long) (sumSize[start + 1] * ratio))
169    ) {
170      ++start;
171    }
172    if (start < countOfFiles) {
173      LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
174        + " files from " + countOfFiles + " candidates");
175    } else if (mayBeStuck) {
176      // We may be stuck. Compact the latest files if we can.
177      int filesToLeave = candidates.size() - comConf.getMinFilesToCompact();
178      if (filesToLeave >= 0) {
179        start = filesToLeave;
180      }
181    }
182    candidates.subList(0, start).clear();
183    return candidates;
184  }
185
186  /**
187   * A heuristic method to decide whether to schedule a compaction request
188   * @param storeFiles      files in the store.
189   * @param filesCompacting files being scheduled to compact.
190   * @return true to schedule a request.
191   */
192  @Override
193  public boolean needsCompaction(Collection<HStoreFile> storeFiles,
194    List<HStoreFile> filesCompacting) {
195    int numCandidates = storeFiles.size() - filesCompacting.size();
196    return numCandidates >= comConf.getMinFilesToCompact();
197  }
198
199  /**
200   * Overwrite min threshold for compaction
201   */
202  public void setMinThreshold(int minThreshold) {
203    comConf.setMinFilesToCompact(minThreshold);
204  }
205}