1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.compactions;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.List;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
31 import org.apache.hadoop.hbase.regionserver.StoreFile;
32 import org.apache.hadoop.hbase.regionserver.StoreUtils;
33 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
34
35
36
37
38
39
40
41
42
43
44
45
46
47 @InterfaceAudience.Private
48 public class FIFOCompactionPolicy extends ExploringCompactionPolicy {
49
50 private static final Log LOG = LogFactory.getLog(FIFOCompactionPolicy.class);
51
52
53 public FIFOCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) {
54 super(conf, storeConfigInfo);
55 }
56
57 @Override
58 public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
59 List<StoreFile> filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak,
60 boolean forceMajor) throws IOException {
61
62 if(forceMajor){
63 LOG.warn("Major compaction is not supported for FIFO compaction policy. Ignore the flag.");
64 }
65 boolean isAfterSplit = StoreUtils.hasReferences(candidateFiles);
66 if(isAfterSplit){
67 LOG.info("Split detected, delegate selection to the parent policy.");
68 return super.selectCompaction(candidateFiles, filesCompacting, isUserCompaction,
69 mayUseOffPeak, forceMajor);
70 }
71
72
73 Collection<StoreFile> toCompact = getExpiredStores(candidateFiles, filesCompacting);
74 CompactionRequest result = new CompactionRequest(toCompact);
75 return result;
76 }
77
78 @Override
79 public boolean isMajorCompaction(Collection<StoreFile> filesToCompact) throws IOException {
80 boolean isAfterSplit = StoreUtils.hasReferences(filesToCompact);
81 if(isAfterSplit){
82 LOG.info("Split detected, delegate to the parent policy.");
83 return super.isMajorCompaction(filesToCompact);
84 }
85 return false;
86 }
87
88 @Override
89 public boolean needsCompaction(Collection<StoreFile> storeFiles,
90 List<StoreFile> filesCompacting) {
91 boolean isAfterSplit = StoreUtils.hasReferences(storeFiles);
92 if(isAfterSplit){
93 LOG.info("Split detected, delegate to the parent policy.");
94 return super.needsCompaction(storeFiles, filesCompacting);
95 }
96 return hasExpiredStores(storeFiles);
97 }
98
99
100
101
102
103
104
105
106 private boolean isEmptyStoreFile(StoreFile sf) {
107 return sf.getReader().getEntries() == 0;
108 }
109
110 private boolean hasExpiredStores(Collection<StoreFile> files) {
111 long currentTime = EnvironmentEdgeManager.currentTime();
112 for (StoreFile sf : files) {
113 if (isEmptyStoreFile(sf)) {
114 return true;
115 }
116
117 Long maxTs = sf.getReader().getMaxTimestamp();
118 long maxTtl = storeConfigInfo.getStoreFileTtl();
119 if (maxTs == null || maxTtl == Long.MAX_VALUE || (currentTime - maxTtl < maxTs)) {
120 continue;
121 } else {
122 return true;
123 }
124 }
125 return false;
126 }
127
128 private Collection<StoreFile> getExpiredStores(Collection<StoreFile> files,
129 Collection<StoreFile> filesCompacting) {
130 long currentTime = EnvironmentEdgeManager.currentTime();
131 Collection<StoreFile> expiredStores = new ArrayList<StoreFile>();
132 for (StoreFile sf : files) {
133 if (isEmptyStoreFile(sf)) {
134 expiredStores.add(sf);
135 continue;
136 }
137
138 Long maxTs = sf.getReader().getMaxTimestamp();
139 long maxTtl = storeConfigInfo.getStoreFileTtl();
140 if (maxTs == null || maxTtl == Long.MAX_VALUE || (currentTime - maxTtl < maxTs)) {
141 continue;
142 } else if (filesCompacting == null || filesCompacting.contains(sf) == false) {
143 expiredStores.add(sf);
144 }
145 }
146 return expiredStores;
147 }
148 }