001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver.compactions; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.List; 025import java.util.OptionalLong; 026 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.regionserver.HStore; 030import org.apache.hadoop.hbase.regionserver.HStoreFile; 031import org.apache.hadoop.hbase.regionserver.RSRpcServices; 032import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; 033import org.apache.hadoop.hbase.regionserver.StoreUtils; 034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** 040 * The default algorithm for selecting files for compaction. 041 * Combines the compaction configuration and the provisional file selection that 042 * it's given to produce the list of suitable candidates for compaction. 043 */ 044@InterfaceAudience.Private 045public class RatioBasedCompactionPolicy extends SortedCompactionPolicy { 046 private static final Logger LOG = LoggerFactory.getLogger(RatioBasedCompactionPolicy.class); 047 048 public RatioBasedCompactionPolicy(Configuration conf, 049 StoreConfigInformation storeConfigInfo) { 050 super(conf, storeConfigInfo); 051 } 052 053 /* 054 * @param filesToCompact Files to compact. Can be null. 055 * @return True if we should run a major compaction. 056 */ 057 @Override 058 public boolean shouldPerformMajorCompaction(Collection<HStoreFile> filesToCompact) 059 throws IOException { 060 boolean result = false; 061 long mcTime = getNextMajorCompactTime(filesToCompact); 062 if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) { 063 return result; 064 } 065 // TODO: Use better method for determining stamp of last major (HBASE-2990) 066 long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact); 067 long now = EnvironmentEdgeManager.currentTime(); 068 if (lowTimestamp > 0L && lowTimestamp < (now - mcTime)) { 069 String regionInfo; 070 if (this.storeConfigInfo != null && this.storeConfigInfo instanceof HStore) { 071 regionInfo = ((HStore)this.storeConfigInfo).getRegionInfo().getRegionNameAsString(); 072 } else { 073 regionInfo = this.toString(); 074 } 075 // Major compaction time has elapsed. 076 long cfTTL = HConstants.FOREVER; 077 if (this.storeConfigInfo != null) { 078 cfTTL = this.storeConfigInfo.getStoreFileTtl(); 079 } 080 if (filesToCompact.size() == 1) { 081 // Single file 082 HStoreFile sf = filesToCompact.iterator().next(); 083 OptionalLong minTimestamp = sf.getMinimumTimestamp(); 084 long oldest = minTimestamp.isPresent() ? now - minTimestamp.getAsLong() : Long.MIN_VALUE; 085 if (sf.isMajorCompactionResult() && (cfTTL == Long.MAX_VALUE || oldest < cfTTL)) { 086 float blockLocalityIndex = 087 sf.getHDFSBlockDistribution().getBlockLocalityIndex( 088 RSRpcServices.getHostname(comConf.conf, false)); 089 if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) { 090 LOG.debug("Major compaction triggered on only store " + regionInfo 091 + "; to make hdfs blocks local, current blockLocalityIndex is " 092 + blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + ")"); 093 result = true; 094 } else { 095 LOG.debug("Skipping major compaction of " + regionInfo 096 + " because one (major) compacted file only, oldestTime " + oldest 097 + "ms is < TTL=" + cfTTL + " and blockLocalityIndex is " + blockLocalityIndex 098 + " (min " + comConf.getMinLocalityToForceCompact() + ")"); 099 } 100 } else if (cfTTL != HConstants.FOREVER && oldest > cfTTL) { 101 LOG.debug("Major compaction triggered on store " + regionInfo 102 + ", because keyvalues outdated; time since last major compaction " 103 + (now - lowTimestamp) + "ms"); 104 result = true; 105 } 106 } else { 107 LOG.debug("Major compaction triggered on store " + regionInfo 108 + "; time since last major compaction " + (now - lowTimestamp) + "ms"); 109 result = true; 110 } 111 } 112 return result; 113 } 114 115 @Override 116 protected CompactionRequestImpl createCompactionRequest(ArrayList<HStoreFile> 117 candidateSelection, boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) 118 throws IOException { 119 if (!tryingMajor) { 120 candidateSelection = filterBulk(candidateSelection); 121 candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck); 122 candidateSelection = checkMinFilesCriteria(candidateSelection, 123 comConf.getMinFilesToCompact()); 124 } 125 return new CompactionRequestImpl(candidateSelection); 126 } 127 128 /** 129 * -- Default minor compaction selection algorithm: 130 * choose CompactSelection from candidates -- 131 * First exclude bulk-load files if indicated in configuration. 132 * Start at the oldest file and stop when you find the first file that 133 * meets compaction criteria: 134 * (1) a recently-flushed, small file (i.e. <= minCompactSize) 135 * OR 136 * (2) within the compactRatio of sum(newer_files) 137 * Given normal skew, any newer files will also meet this criteria 138 * <p/> 139 * Additional Note: 140 * If fileSizes.size() >> maxFilesToCompact, we will recurse on 141 * compact(). Consider the oldest files first to avoid a 142 * situation where we always compact [end-threshold,end). Then, the 143 * last file becomes an aggregate of the previous compactions. 144 * 145 * normal skew: 146 * 147 * older ----> newer (increasing seqID) 148 * _ 149 * | | _ 150 * | | | | _ 151 * --|-|- |-|- |-|---_-------_------- minCompactSize 152 * | | | | | | | | _ | | 153 * | | | | | | | | | | | | 154 * | | | | | | | | | | | | 155 * @param candidates pre-filtrate 156 * @return filtered subset 157 */ 158 protected ArrayList<HStoreFile> applyCompactionPolicy(ArrayList<HStoreFile> candidates, 159 boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { 160 if (candidates.isEmpty()) { 161 return candidates; 162 } 163 164 // we're doing a minor compaction, let's see what files are applicable 165 int start = 0; 166 double ratio = comConf.getCompactionRatio(); 167 if (mayUseOffPeak) { 168 ratio = comConf.getCompactionRatioOffPeak(); 169 LOG.info("Running an off-peak compaction, selection ratio = " + ratio); 170 } 171 172 // get store file sizes for incremental compacting selection. 173 final int countOfFiles = candidates.size(); 174 long[] fileSizes = new long[countOfFiles]; 175 long[] sumSize = new long[countOfFiles]; 176 for (int i = countOfFiles - 1; i >= 0; --i) { 177 HStoreFile file = candidates.get(i); 178 fileSizes[i] = file.getReader().length(); 179 // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo 180 int tooFar = i + comConf.getMaxFilesToCompact() - 1; 181 sumSize[i] = fileSizes[i] 182 + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0) 183 - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0); 184 } 185 186 187 while (countOfFiles - start >= comConf.getMinFilesToCompact() && 188 fileSizes[start] > Math.max(comConf.getMinCompactSize(), 189 (long) (sumSize[start + 1] * ratio))) { 190 ++start; 191 } 192 if (start < countOfFiles) { 193 LOG.info("Default compaction algorithm has selected " + (countOfFiles - start) 194 + " files from " + countOfFiles + " candidates"); 195 } else if (mayBeStuck) { 196 // We may be stuck. Compact the latest files if we can. 197 int filesToLeave = candidates.size() - comConf.getMinFilesToCompact(); 198 if (filesToLeave >= 0) { 199 start = filesToLeave; 200 } 201 } 202 candidates.subList(0, start).clear(); 203 return candidates; 204 } 205 206 /** 207 * A heuristic method to decide whether to schedule a compaction request 208 * @param storeFiles files in the store. 209 * @param filesCompacting files being scheduled to compact. 210 * @return true to schedule a request. 211 */ 212 @Override 213 public boolean needsCompaction(Collection<HStoreFile> storeFiles, 214 List<HStoreFile> filesCompacting) { 215 int numCandidates = storeFiles.size() - filesCompacting.size(); 216 return numCandidates >= comConf.getMinFilesToCompact(); 217 } 218 219 /** 220 * Overwrite min threshold for compaction 221 */ 222 public void setMinThreshold(int minThreshold) { 223 comConf.setMinFilesToCompact(minThreshold); 224 } 225}