1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.compactions;
19
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ConcurrentMap;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.conf.Configured;
27 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
28 import org.apache.hadoop.hbase.ScheduledChore;
29 import org.apache.hadoop.hbase.Stoppable;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
32 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
48 public class PressureAwareCompactionThroughputController extends Configured implements
49 CompactionThroughputController, Stoppable {
50
51 private final static Log LOG = LogFactory
52 .getLog(PressureAwareCompactionThroughputController.class);
53
54 public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND =
55 "hbase.hstore.compaction.throughput.higher.bound";
56
57 private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND =
58 20L * 1024 * 1024;
59
60 public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND =
61 "hbase.hstore.compaction.throughput.lower.bound";
62
63 private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND =
64 10L * 1024 * 1024;
65
66 public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK =
67 "hbase.hstore.compaction.throughput.offpeak";
68
69 private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK = Long.MAX_VALUE;
70
71 public static final String HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD =
72 "hbase.hstore.compaction.throughput.tune.period";
73
74 private static final int DEFAULT_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD = 60 * 1000;
75
76
77
78
79 private static final class ActiveCompaction {
80
81 private final long startTime;
82
83 private long lastControlTime;
84
85 private long lastControlSize;
86
87 private long totalSize;
88
89 private long numberOfSleeps;
90
91 private long totalSleepTime;
92
93
94 private long lastLogTime;
95
96 ActiveCompaction() {
97 long currentTime = EnvironmentEdgeManager.currentTime();
98 this.startTime = currentTime;
99 this.lastControlTime = currentTime;
100 this.lastLogTime = currentTime;
101 }
102 }
103
104 private long maxThroughputHigherBound;
105
106 private long maxThroughputLowerBound;
107
108 private long maxThroughputOffpeak;
109
110 private OffPeakHours offPeakHours;
111
112 private long controlPerSize;
113
114 private int tuningPeriod;
115
116 volatile double maxThroughput;
117
118 private final ConcurrentMap<String, ActiveCompaction> activeCompactions =
119 new ConcurrentHashMap<String, ActiveCompaction>();
120
121 @Override
122 public void setup(final RegionServerServices server) {
123 server.getChoreService().scheduleChore(
124 new ScheduledChore("CompactionThroughputTuner", this, tuningPeriod) {
125
126 @Override
127 protected void chore() {
128 tune(server.getCompactionPressure());
129 }
130 });
131 }
132
133 private void tune(double compactionPressure) {
134 double maxThroughputToSet;
135 if (compactionPressure > 1.0) {
136
137 maxThroughputToSet = Double.MAX_VALUE;
138 } else if (offPeakHours.isOffPeakHour()) {
139 maxThroughputToSet = maxThroughputOffpeak;
140 } else {
141
142
143 maxThroughputToSet =
144 maxThroughputLowerBound + (maxThroughputHigherBound - maxThroughputLowerBound)
145 * compactionPressure;
146 }
147 if (LOG.isDebugEnabled()) {
148 LOG.debug("compactionPressure is " + compactionPressure + ", tune compaction throughput to "
149 + throughputDesc(maxThroughputToSet));
150 }
151 this.maxThroughput = maxThroughputToSet;
152 }
153
154 @Override
155 public void setConf(Configuration conf) {
156 super.setConf(conf);
157 if (conf == null) {
158 return;
159 }
160 this.maxThroughputHigherBound =
161 conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
162 DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND);
163 this.maxThroughputLowerBound =
164 conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
165 DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND);
166 this.maxThroughputOffpeak =
167 conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK,
168 DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK);
169 this.offPeakHours = OffPeakHours.getInstance(conf);
170 this.controlPerSize = this.maxThroughputLowerBound;
171 this.maxThroughput = this.maxThroughputLowerBound;
172 this.tuningPeriod =
173 getConf().getInt(HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD,
174 DEFAULT_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD);
175 LOG.info("Compaction throughput configurations, higher bound: "
176 + throughputDesc(maxThroughputHigherBound) + ", lower bound "
177 + throughputDesc(maxThroughputLowerBound) + ", off peak: "
178 + throughputDesc(maxThroughputOffpeak) + ", tuning period: " + tuningPeriod + " ms");
179 }
180
181 private String throughputDesc(long deltaSize, long elapsedTime) {
182 return throughputDesc((double) deltaSize / elapsedTime * 1000);
183 }
184
185 private String throughputDesc(double speed) {
186 if (speed >= 1E15) {
187 return "unlimited";
188 } else {
189 return String.format("%.2f MB/sec", speed / 1024 / 1024);
190 }
191 }
192
193 @Override
194 public void start(String compactionName) {
195 activeCompactions.put(compactionName, new ActiveCompaction());
196 }
197
198 @Override
199 public long control(String compactionName, long size) throws InterruptedException {
200 ActiveCompaction compaction = activeCompactions.get(compactionName);
201 compaction.totalSize += size;
202 long deltaSize = compaction.totalSize - compaction.lastControlSize;
203 if (deltaSize < controlPerSize) {
204 return 0;
205 }
206 long now = EnvironmentEdgeManager.currentTime();
207 double maxThroughputPerCompaction = this.maxThroughput / activeCompactions.size();
208 long minTimeAllowed = (long) (deltaSize / maxThroughputPerCompaction * 1000);
209 long elapsedTime = now - compaction.lastControlTime;
210 compaction.lastControlSize = compaction.totalSize;
211 if (elapsedTime >= minTimeAllowed) {
212 compaction.lastControlTime = EnvironmentEdgeManager.currentTime();
213 return 0;
214 }
215
216 long sleepTime = minTimeAllowed - elapsedTime;
217 if (LOG.isDebugEnabled()) {
218
219 if (now - compaction.lastLogTime > 60L * 1000) {
220 LOG.debug(compactionName + " sleep " + sleepTime + " ms because current throughput is "
221 + throughputDesc(deltaSize, elapsedTime) + ", max allowed is "
222 + throughputDesc(maxThroughputPerCompaction) + ", already slept "
223 + compaction.numberOfSleeps + " time(s) and total slept time is "
224 + compaction.totalSleepTime + " ms till now.");
225 compaction.lastLogTime = now;
226 }
227 }
228 Thread.sleep(sleepTime);
229 compaction.numberOfSleeps++;
230 compaction.totalSleepTime += sleepTime;
231 compaction.lastControlTime = EnvironmentEdgeManager.currentTime();
232 return sleepTime;
233 }
234
235 @Override
236 public void finish(String compactionName) {
237 ActiveCompaction compaction = activeCompactions.remove(compactionName);
238 long elapsedTime = Math.max(1, EnvironmentEdgeManager.currentTime() - compaction.startTime);
239 LOG.info(compactionName + " average throughput is "
240 + throughputDesc(compaction.totalSize, elapsedTime) + ", slept "
241 + compaction.numberOfSleeps + " time(s) and total slept time is "
242 + compaction.totalSleepTime + " ms. " + activeCompactions.size()
243 + " active compactions remaining, total limit is " + throughputDesc(maxThroughput));
244 }
245
246 private volatile boolean stopped = false;
247
248 @Override
249 public void stop(String why) {
250 stopped = true;
251 }
252
253 @Override
254 public boolean isStopped() {
255 return stopped;
256 }
257
258 @Override
259 public String toString() {
260 return "DefaultCompactionThroughputController [maxThroughput=" + throughputDesc(maxThroughput)
261 + ", activeCompactions=" + activeCompactions.size() + "]";
262 }
263 }