1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver.compactions;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.List;
26 import java.util.Random;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
34 import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
35 import org.apache.hadoop.hbase.regionserver.StoreFile;
36 import org.apache.hadoop.hbase.regionserver.StoreUtils;
37
38 import com.google.common.base.Preconditions;
39 import com.google.common.base.Predicate;
40 import com.google.common.collect.Collections2;
41
42
43
44
45
46
47 @InterfaceAudience.Private
48 public class RatioBasedCompactionPolicy extends CompactionPolicy {
49 private static final Log LOG = LogFactory.getLog(RatioBasedCompactionPolicy.class);
50
51 public RatioBasedCompactionPolicy(Configuration conf,
52 StoreConfigInformation storeConfigInfo) {
53 super(conf, storeConfigInfo);
54 }
55
56 private ArrayList<StoreFile> getCurrentEligibleFiles(
57 ArrayList<StoreFile> candidateFiles, final List<StoreFile> filesCompacting) {
58
59 if (!filesCompacting.isEmpty()) {
60
61
62 StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
63 int idx = candidateFiles.indexOf(last);
64 Preconditions.checkArgument(idx != -1);
65 candidateFiles.subList(0, idx + 1).clear();
66 }
67 return candidateFiles;
68 }
69
70 public List<StoreFile> preSelectCompactionForCoprocessor(
71 final Collection<StoreFile> candidates, final List<StoreFile> filesCompacting) {
72 return getCurrentEligibleFiles(new ArrayList<StoreFile>(candidates), filesCompacting);
73 }
74
75
76
77
78
79
80 public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
81 final List<StoreFile> filesCompacting, final boolean isUserCompaction,
82 final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {
83
84 ArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);
85
86
87
88 int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
89 boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)
90 >= storeConfigInfo.getBlockingFileCount();
91 candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);
92 LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " +
93 filesCompacting.size() + " compacting, " + candidateSelection.size() +
94 " eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking");
95
96
97 boolean isAllFiles = candidateFiles.size() == candidateSelection.size();
98 if (!(forceMajor && isAllFiles)) {
99 candidateSelection = skipLargeFiles(candidateSelection, mayUseOffPeak);
100 isAllFiles = candidateFiles.size() == candidateSelection.size();
101 }
102
103
104
105 boolean isTryingMajor = (forceMajor && isAllFiles && isUserCompaction)
106 || (((forceMajor && isAllFiles) || isMajorCompaction(candidateSelection))
107 && (candidateSelection.size() < comConf.getMaxFilesToCompact()));
108
109 boolean isAfterSplit = StoreUtils.hasReferences(candidateSelection);
110 if (!isTryingMajor && !isAfterSplit) {
111
112 candidateSelection = filterBulk(candidateSelection);
113 candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
114 candidateSelection = checkMinFilesCriteria(candidateSelection);
115 }
116 candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, isTryingMajor);
117
118 isAllFiles = (candidateFiles.size() == candidateSelection.size());
119 CompactionRequest result = new CompactionRequest(candidateSelection);
120 result.setOffPeak(!candidateSelection.isEmpty() && !isAllFiles && mayUseOffPeak);
121 result.setIsMajor(isTryingMajor && isAllFiles, isAllFiles);
122 return result;
123 }
124
125
126
127
128
129
130
131 private ArrayList<StoreFile> skipLargeFiles(ArrayList<StoreFile> candidates,
132 boolean mayUseOffpeak) {
133 int pos = 0;
134 while (pos < candidates.size() && !candidates.get(pos).isReference()
135 && (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize(mayUseOffpeak))) {
136 ++pos;
137 }
138 if (pos > 0) {
139 LOG.debug("Some files are too large. Excluding " + pos
140 + " files from compaction candidates");
141 candidates.subList(0, pos).clear();
142 }
143 return candidates;
144 }
145
146
147
148
149
150
151 private ArrayList<StoreFile> filterBulk(ArrayList<StoreFile> candidates) {
152 candidates.removeAll(Collections2.filter(candidates,
153 new Predicate<StoreFile>() {
154 @Override
155 public boolean apply(StoreFile input) {
156 return input.excludeFromMinorCompaction();
157 }
158 }));
159 return candidates;
160 }
161
162
163
164
165
166
167 private ArrayList<StoreFile> removeExcessFiles(ArrayList<StoreFile> candidates,
168 boolean isUserCompaction, boolean isMajorCompaction) {
169 int excess = candidates.size() - comConf.getMaxFilesToCompact();
170 if (excess > 0) {
171 if (isMajorCompaction && isUserCompaction) {
172 LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() +
173 " files because of a user-requested major compaction");
174 } else {
175 LOG.debug("Too many admissible files. Excluding " + excess
176 + " files from compaction candidates");
177 candidates.subList(comConf.getMaxFilesToCompact(), candidates.size()).clear();
178 }
179 }
180 return candidates;
181 }
182
183
184
185
186
187 private ArrayList<StoreFile> checkMinFilesCriteria(ArrayList<StoreFile> candidates) {
188 int minFiles = comConf.getMinFilesToCompact();
189 if (candidates.size() < minFiles) {
190 if(LOG.isDebugEnabled()) {
191 LOG.debug("Not compacting files because we only have " + candidates.size() +
192 " files ready for compaction. Need " + minFiles + " to initiate.");
193 }
194 candidates.clear();
195 }
196 return candidates;
197 }
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229 ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,
230 boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
231 if (candidates.isEmpty()) {
232 return candidates;
233 }
234
235
236 int start = 0;
237 double ratio = comConf.getCompactionRatio();
238 if (mayUseOffPeak) {
239 ratio = comConf.getCompactionRatioOffPeak();
240 LOG.info("Running an off-peak compaction, selection ratio = " + ratio);
241 }
242
243
244 final int countOfFiles = candidates.size();
245 long[] fileSizes = new long[countOfFiles];
246 long[] sumSize = new long[countOfFiles];
247 for (int i = countOfFiles - 1; i >= 0; --i) {
248 StoreFile file = candidates.get(i);
249 fileSizes[i] = file.getReader().length();
250
251 int tooFar = i + comConf.getMaxFilesToCompact() - 1;
252 sumSize[i] = fileSizes[i]
253 + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
254 - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
255 }
256
257
258 while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
259 fileSizes[start] > Math.max(comConf.getMinCompactSize(),
260 (long) (sumSize[start + 1] * ratio))) {
261 ++start;
262 }
263 if (start < countOfFiles) {
264 LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
265 + " files from " + countOfFiles + " candidates");
266 } else if (mayBeStuck) {
267
268 int filesToLeave = candidates.size() - comConf.getMinFilesToCompact();
269 if (filesToLeave >= 0) {
270 start = filesToLeave;
271 }
272 }
273 candidates.subList(0, start).clear();
274 return candidates;
275 }
276
277
278
279
280
281 @Override
282 public boolean isMajorCompaction(final Collection<StoreFile> filesToCompact)
283 throws IOException {
284 boolean result = false;
285 long mcTime = getNextMajorCompactTime(filesToCompact);
286 if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
287 return result;
288 }
289
290 long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
291 long now = System.currentTimeMillis();
292 if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
293
294 long cfTtl = this.storeConfigInfo.getStoreFileTtl();
295 if (filesToCompact.size() == 1) {
296
297 StoreFile sf = filesToCompact.iterator().next();
298 Long minTimestamp = sf.getMinimumTimestamp();
299 long oldest = (minTimestamp == null)
300 ? Long.MIN_VALUE
301 : now - minTimestamp.longValue();
302 if (sf.isMajorCompaction() &&
303 (cfTtl == HConstants.FOREVER || oldest < cfTtl)) {
304 float blockLocalityIndex = sf.getHDFSBlockDistribution().getBlockLocalityIndex(
305 RSRpcServices.getHostname(comConf.conf, false)
306 );
307 if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
308 if (LOG.isDebugEnabled()) {
309 LOG.debug("Major compaction triggered on only store " + this +
310 "; to make hdfs blocks local, current blockLocalityIndex is " +
311 blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() +
312 ")");
313 }
314 result = true;
315 } else {
316 if (LOG.isDebugEnabled()) {
317 LOG.debug("Skipping major compaction of " + this +
318 " because one (major) compacted file only, oldestTime " +
319 oldest + "ms is < ttl=" + cfTtl + " and blockLocalityIndex is " +
320 blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() +
321 ")");
322 }
323 }
324 } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) {
325 LOG.debug("Major compaction triggered on store " + this +
326 ", because keyvalues outdated; time since last major compaction " +
327 (now - lowTimestamp) + "ms");
328 result = true;
329 }
330 } else {
331 if (LOG.isDebugEnabled()) {
332 LOG.debug("Major compaction triggered on store " + this +
333 "; time since last major compaction " + (now - lowTimestamp) + "ms");
334 }
335 result = true;
336 }
337 }
338 return result;
339 }
340
341
342
343
344 private final Random random = new Random();
345
346
347
348
349
350 public long getNextMajorCompactTime(final Collection<StoreFile> filesToCompact) {
351
352 long ret = comConf.getMajorCompactionPeriod();
353 if (ret > 0) {
354
355 double jitterPct = comConf.getMajorCompactionJitter();
356 if (jitterPct > 0) {
357 long jitter = Math.round(ret * jitterPct);
358
359 Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact);
360 if (seed != null) {
361
362 double rnd = -1;
363 synchronized (this) {
364 this.random.setSeed(seed);
365 rnd = this.random.nextDouble();
366 }
367 ret += jitter - Math.round(2L * jitter * rnd);
368 } else {
369 ret = 0;
370 }
371 }
372 }
373 return ret;
374 }
375
376
377
378
379
380 @Override
381 public boolean throttleCompaction(long compactionSize) {
382 return compactionSize > comConf.getThrottlePoint();
383 }
384
385 public boolean needsCompaction(final Collection<StoreFile> storeFiles,
386 final List<StoreFile> filesCompacting) {
387 int numCandidates = storeFiles.size() - filesCompacting.size();
388 return numCandidates >= comConf.getMinFilesToCompact();
389 }
390 }