001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.compactions; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.List; 024import java.util.OptionalLong; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.HConstants; 027import org.apache.hadoop.hbase.regionserver.HStore; 028import org.apache.hadoop.hbase.regionserver.HStoreFile; 029import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; 030import org.apache.hadoop.hbase.regionserver.StoreUtils; 031import org.apache.hadoop.hbase.util.DNS; 032import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037/** 038 * The default algorithm for selecting files for compaction. Combines the compaction configuration 039 * and the provisional file selection that it's given to produce the list of suitable candidates for 040 * compaction. 041 */ 042@InterfaceAudience.Private 043public class RatioBasedCompactionPolicy extends SortedCompactionPolicy { 044 private static final Logger LOG = LoggerFactory.getLogger(RatioBasedCompactionPolicy.class); 045 046 public RatioBasedCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) { 047 super(conf, storeConfigInfo); 048 } 049 050 /* 051 * @param filesToCompact Files to compact. Can be null. 052 * @return True if we should run a major compaction. 053 */ 054 @Override 055 public boolean shouldPerformMajorCompaction(Collection<HStoreFile> filesToCompact) 056 throws IOException { 057 boolean result = false; 058 long mcTime = getNextMajorCompactTime(filesToCompact); 059 if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) { 060 return result; 061 } 062 // TODO: Use better method for determining stamp of last major (HBASE-2990) 063 long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact); 064 long now = EnvironmentEdgeManager.currentTime(); 065 if (lowTimestamp > 0L && lowTimestamp < (now - mcTime)) { 066 String regionInfo; 067 if (this.storeConfigInfo != null && this.storeConfigInfo instanceof HStore) { 068 regionInfo = ((HStore) this.storeConfigInfo).getRegionInfo().getRegionNameAsString(); 069 } else { 070 regionInfo = this.toString(); 071 } 072 // Major compaction time has elapsed. 073 long cfTTL = HConstants.FOREVER; 074 if (this.storeConfigInfo != null) { 075 cfTTL = this.storeConfigInfo.getStoreFileTtl(); 076 } 077 if (filesToCompact.size() == 1) { 078 // Single file 079 HStoreFile sf = filesToCompact.iterator().next(); 080 OptionalLong minTimestamp = sf.getMinimumTimestamp(); 081 long oldest = minTimestamp.isPresent() ? now - minTimestamp.getAsLong() : Long.MIN_VALUE; 082 if (sf.isMajorCompactionResult() && (cfTTL == Long.MAX_VALUE || oldest < cfTTL)) { 083 float blockLocalityIndex = sf.getHDFSBlockDistribution() 084 .getBlockLocalityIndex(DNS.getHostname(comConf.conf, DNS.ServerType.REGIONSERVER)); 085 if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) { 086 LOG.debug("Major compaction triggered on only store " + regionInfo 087 + "; to make hdfs blocks local, current blockLocalityIndex is " + blockLocalityIndex 088 + " (min " + comConf.getMinLocalityToForceCompact() + ")"); 089 result = true; 090 } else { 091 LOG.debug("Skipping major compaction of " + regionInfo 092 + " because one (major) compacted file only, oldestTime " + oldest + "ms is < TTL=" 093 + cfTTL + " and blockLocalityIndex is " + blockLocalityIndex + " (min " 094 + comConf.getMinLocalityToForceCompact() + ")"); 095 } 096 } else if (cfTTL != HConstants.FOREVER && oldest > cfTTL) { 097 LOG.debug("Major compaction triggered on store " + regionInfo 098 + ", because keyvalues outdated; time since last major compaction " 099 + (now - lowTimestamp) + "ms"); 100 result = true; 101 } 102 } else { 103 LOG.debug("Major compaction triggered on store " + regionInfo 104 + "; time since last major compaction " + (now - lowTimestamp) + "ms"); 105 result = true; 106 } 107 } 108 return result; 109 } 110 111 @Override 112 protected CompactionRequestImpl createCompactionRequest(ArrayList<HStoreFile> candidateSelection, 113 boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { 114 if (!tryingMajor) { 115 filterBulk(candidateSelection); 116 candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck); 117 candidateSelection = 118 checkMinFilesCriteria(candidateSelection, comConf.getMinFilesToCompact()); 119 } 120 return new CompactionRequestImpl(candidateSelection); 121 } 122 123 /** 124 * -- Default minor compaction selection algorithm: choose CompactSelection from candidates -- 125 * First exclude bulk-load files if indicated in configuration. Start at the oldest file and stop 126 * when you find the first file that meets compaction criteria: (1) a recently-flushed, small file 127 * (i.e. <= minCompactSize) OR (2) within the compactRatio of sum(newer_files) Given normal skew, 128 * any newer files will also meet this criteria 129 * <p/> 130 * Additional Note: If fileSizes.size() >> maxFilesToCompact, we will recurse on compact(). 131 * Consider the oldest files first to avoid a situation where we always compact 132 * [end-threshold,end). Then, the last file becomes an aggregate of the previous compactions. 133 * normal skew: older ----> newer (increasing seqID) _ | | _ | | | | _ --|-|- |-|- 134 * |-|---_-------_------- minCompactSize | | | | | | | | _ | | | | | | | | | | | | | | | | | | | | 135 * | | | | | | 136 * @param candidates pre-filtrate 137 * @return filtered subset 138 */ 139 protected ArrayList<HStoreFile> applyCompactionPolicy(ArrayList<HStoreFile> candidates, 140 boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { 141 if (candidates.isEmpty()) { 142 return candidates; 143 } 144 145 // we're doing a minor compaction, let's see what files are applicable 146 int start = 0; 147 double ratio = comConf.getCompactionRatio(); 148 if (mayUseOffPeak) { 149 ratio = comConf.getCompactionRatioOffPeak(); 150 LOG.info("Running an off-peak compaction, selection ratio = " + ratio); 151 } 152 153 // get store file sizes for incremental compacting selection. 154 final int countOfFiles = candidates.size(); 155 long[] fileSizes = new long[countOfFiles]; 156 long[] sumSize = new long[countOfFiles]; 157 for (int i = countOfFiles - 1; i >= 0; --i) { 158 HStoreFile file = candidates.get(i); 159 fileSizes[i] = file.getReader().length(); 160 // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo 161 int tooFar = i + comConf.getMaxFilesToCompact() - 1; 162 sumSize[i] = fileSizes[i] + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0) 163 - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0); 164 } 165 166 while ( 167 countOfFiles - start >= comConf.getMinFilesToCompact() && fileSizes[start] 168 > Math.max(comConf.getMinCompactSize(), (long) (sumSize[start + 1] * ratio)) 169 ) { 170 ++start; 171 } 172 if (start < countOfFiles) { 173 LOG.info("Default compaction algorithm has selected " + (countOfFiles - start) 174 + " files from " + countOfFiles + " candidates"); 175 } else if (mayBeStuck) { 176 // We may be stuck. Compact the latest files if we can. 177 int filesToLeave = candidates.size() - comConf.getMinFilesToCompact(); 178 if (filesToLeave >= 0) { 179 start = filesToLeave; 180 } 181 } 182 candidates.subList(0, start).clear(); 183 return candidates; 184 } 185 186 /** 187 * A heuristic method to decide whether to schedule a compaction request 188 * @param storeFiles files in the store. 189 * @param filesCompacting files being scheduled to compact. 190 * @return true to schedule a request. 191 */ 192 @Override 193 public boolean needsCompaction(Collection<HStoreFile> storeFiles, 194 List<HStoreFile> filesCompacting) { 195 int numCandidates = storeFiles.size() - filesCompacting.size(); 196 return numCandidates >= comConf.getMinFilesToCompact(); 197 } 198 199 /** 200 * Overwrite min threshold for compaction 201 */ 202 public void setMinThreshold(int minThreshold) { 203 comConf.setMinFilesToCompact(minThreshold); 204 } 205}