1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver.compactions;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.List;
26 import java.util.Random;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
34 import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
35 import org.apache.hadoop.hbase.regionserver.StoreFile;
36 import org.apache.hadoop.hbase.regionserver.StoreUtils;
37
38 import com.google.common.base.Preconditions;
39 import com.google.common.base.Predicate;
40 import com.google.common.collect.Collections2;
41
42
43
44
45
46
47 @InterfaceAudience.Private
48 public class RatioBasedCompactionPolicy extends CompactionPolicy {
49 private static final Log LOG = LogFactory.getLog(RatioBasedCompactionPolicy.class);
50
51 public RatioBasedCompactionPolicy(Configuration conf,
52 StoreConfigInformation storeConfigInfo) {
53 super(conf, storeConfigInfo);
54 }
55
56 private ArrayList<StoreFile> getCurrentEligibleFiles(
57 ArrayList<StoreFile> candidateFiles, final List<StoreFile> filesCompacting) {
58
59 if (!filesCompacting.isEmpty()) {
60
61
62 StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
63 int idx = candidateFiles.indexOf(last);
64 Preconditions.checkArgument(idx != -1);
65 candidateFiles.subList(0, idx + 1).clear();
66 }
67 return candidateFiles;
68 }
69
70 public List<StoreFile> preSelectCompactionForCoprocessor(
71 final Collection<StoreFile> candidates, final List<StoreFile> filesCompacting) {
72 return getCurrentEligibleFiles(new ArrayList<StoreFile>(candidates), filesCompacting);
73 }
74
75
76
77
78
79
80 public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
81 final List<StoreFile> filesCompacting, final boolean isUserCompaction,
82 final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {
83
84 ArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);
85
86
87
88 int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
89 boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)
90 >= storeConfigInfo.getBlockingFileCount();
91 candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);
92 LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " +
93 filesCompacting.size() + " compacting, " + candidateSelection.size() +
94 " eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking");
95
96
97 boolean isAllFiles = candidateFiles.size() == candidateSelection.size();
98 if (!(forceMajor && isAllFiles)) {
99 candidateSelection = skipLargeFiles(candidateSelection);
100 isAllFiles = candidateFiles.size() == candidateSelection.size();
101 }
102
103
104
105 boolean isTryingMajor = (forceMajor && isAllFiles && isUserCompaction)
106 || (((forceMajor && isAllFiles) || isMajorCompaction(candidateSelection))
107 && (candidateSelection.size() < comConf.getMaxFilesToCompact()));
108
109 boolean isAfterSplit = StoreUtils.hasReferences(candidateSelection);
110 if (!isTryingMajor && !isAfterSplit) {
111
112 candidateSelection = filterBulk(candidateSelection);
113 candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
114 candidateSelection = checkMinFilesCriteria(candidateSelection);
115 }
116 candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, isTryingMajor);
117
118 isAllFiles = (candidateFiles.size() == candidateSelection.size());
119 CompactionRequest result = new CompactionRequest(candidateSelection);
120 result.setOffPeak(!candidateSelection.isEmpty() && !isAllFiles && mayUseOffPeak);
121 result.setIsMajor(isTryingMajor && isAllFiles, isAllFiles);
122 return result;
123 }
124
125
126
127
128
129
130
131 private ArrayList<StoreFile> skipLargeFiles(ArrayList<StoreFile> candidates) {
132 int pos = 0;
133 while (pos < candidates.size() && !candidates.get(pos).isReference()
134 && (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize())) {
135 ++pos;
136 }
137 if (pos > 0) {
138 LOG.debug("Some files are too large. Excluding " + pos
139 + " files from compaction candidates");
140 candidates.subList(0, pos).clear();
141 }
142 return candidates;
143 }
144
145
146
147
148
149
150 private ArrayList<StoreFile> filterBulk(ArrayList<StoreFile> candidates) {
151 candidates.removeAll(Collections2.filter(candidates,
152 new Predicate<StoreFile>() {
153 @Override
154 public boolean apply(StoreFile input) {
155 return input.excludeFromMinorCompaction();
156 }
157 }));
158 return candidates;
159 }
160
161
162
163
164
165
166 private ArrayList<StoreFile> removeExcessFiles(ArrayList<StoreFile> candidates,
167 boolean isUserCompaction, boolean isMajorCompaction) {
168 int excess = candidates.size() - comConf.getMaxFilesToCompact();
169 if (excess > 0) {
170 if (isMajorCompaction && isUserCompaction) {
171 LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() +
172 " files because of a user-requested major compaction");
173 } else {
174 LOG.debug("Too many admissible files. Excluding " + excess
175 + " files from compaction candidates");
176 candidates.subList(comConf.getMaxFilesToCompact(), candidates.size()).clear();
177 }
178 }
179 return candidates;
180 }
181
182
183
184
185
186 private ArrayList<StoreFile> checkMinFilesCriteria(ArrayList<StoreFile> candidates) {
187 int minFiles = comConf.getMinFilesToCompact();
188 if (candidates.size() < minFiles) {
189 if(LOG.isDebugEnabled()) {
190 LOG.debug("Not compacting files because we only have " + candidates.size() +
191 " files ready for compaction. Need " + minFiles + " to initiate.");
192 }
193 candidates.clear();
194 }
195 return candidates;
196 }
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228 ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,
229 boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
230 if (candidates.isEmpty()) {
231 return candidates;
232 }
233
234
235 int start = 0;
236 double ratio = comConf.getCompactionRatio();
237 if (mayUseOffPeak) {
238 ratio = comConf.getCompactionRatioOffPeak();
239 LOG.info("Running an off-peak compaction, selection ratio = " + ratio);
240 }
241
242
243 final int countOfFiles = candidates.size();
244 long[] fileSizes = new long[countOfFiles];
245 long[] sumSize = new long[countOfFiles];
246 for (int i = countOfFiles - 1; i >= 0; --i) {
247 StoreFile file = candidates.get(i);
248 fileSizes[i] = file.getReader().length();
249
250 int tooFar = i + comConf.getMaxFilesToCompact() - 1;
251 sumSize[i] = fileSizes[i]
252 + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
253 - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
254 }
255
256
257 while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
258 fileSizes[start] > Math.max(comConf.getMinCompactSize(),
259 (long) (sumSize[start + 1] * ratio))) {
260 ++start;
261 }
262 if (start < countOfFiles) {
263 LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
264 + " files from " + countOfFiles + " candidates");
265 } else if (mayBeStuck) {
266
267 int filesToLeave = candidates.size() - comConf.getMinFilesToCompact();
268 if (filesToLeave >= 0) {
269 start = filesToLeave;
270 }
271 }
272 candidates.subList(0, start).clear();
273 return candidates;
274 }
275
276
277
278
279
280 @Override
281 public boolean isMajorCompaction(final Collection<StoreFile> filesToCompact)
282 throws IOException {
283 boolean result = false;
284 long mcTime = getNextMajorCompactTime(filesToCompact);
285 if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
286 return result;
287 }
288
289 long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
290 long now = System.currentTimeMillis();
291 if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
292
293 long cfTtl = this.storeConfigInfo.getStoreFileTtl();
294 if (filesToCompact.size() == 1) {
295
296 StoreFile sf = filesToCompact.iterator().next();
297 Long minTimestamp = sf.getMinimumTimestamp();
298 long oldest = (minTimestamp == null)
299 ? Long.MIN_VALUE
300 : now - minTimestamp.longValue();
301 if (sf.isMajorCompaction() &&
302 (cfTtl == HConstants.FOREVER || oldest < cfTtl)) {
303 float blockLocalityIndex = sf.getHDFSBlockDistribution().getBlockLocalityIndex(
304 RSRpcServices.getHostname(comConf.conf, false)
305 );
306 if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
307 if (LOG.isDebugEnabled()) {
308 LOG.debug("Major compaction triggered on only store " + this +
309 "; to make hdfs blocks local, current blockLocalityIndex is " +
310 blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() +
311 ")");
312 }
313 result = true;
314 } else {
315 if (LOG.isDebugEnabled()) {
316 LOG.debug("Skipping major compaction of " + this +
317 " because one (major) compacted file only, oldestTime " +
318 oldest + "ms is < ttl=" + cfTtl + " and blockLocalityIndex is " +
319 blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() +
320 ")");
321 }
322 }
323 } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) {
324 LOG.debug("Major compaction triggered on store " + this +
325 ", because keyvalues outdated; time since last major compaction " +
326 (now - lowTimestamp) + "ms");
327 result = true;
328 }
329 } else {
330 if (LOG.isDebugEnabled()) {
331 LOG.debug("Major compaction triggered on store " + this +
332 "; time since last major compaction " + (now - lowTimestamp) + "ms");
333 }
334 result = true;
335 }
336 }
337 return result;
338 }
339
340
341
342
343 private final Random random = new Random();
344
345
346
347
348
349 public long getNextMajorCompactTime(final Collection<StoreFile> filesToCompact) {
350
351 long ret = comConf.getMajorCompactionPeriod();
352 if (ret > 0) {
353
354 double jitterPct = comConf.getMajorCompactionJitter();
355 if (jitterPct > 0) {
356 long jitter = Math.round(ret * jitterPct);
357
358 Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact);
359 if (seed != null) {
360
361 double rnd = -1;
362 synchronized (this) {
363 this.random.setSeed(seed);
364 rnd = this.random.nextDouble();
365 }
366 ret += jitter - Math.round(2L * jitter * rnd);
367 } else {
368 ret = 0;
369 }
370 }
371 }
372 return ret;
373 }
374
375
376
377
378
379 @Override
380 public boolean throttleCompaction(long compactionSize) {
381 return compactionSize > comConf.getThrottlePoint();
382 }
383
384 public boolean needsCompaction(final Collection<StoreFile> storeFiles,
385 final List<StoreFile> filesCompacting) {
386 int numCandidates = storeFiles.size() - filesCompacting.size();
387 return numCandidates >= comConf.getMinFilesToCompact();
388 }
389 }