View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.compactions;
19  
20  import java.util.concurrent.ConcurrentHashMap;
21  import java.util.concurrent.ConcurrentMap;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.conf.Configured;
27  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
28  import org.apache.hadoop.hbase.ScheduledChore;
29  import org.apache.hadoop.hbase.Stoppable;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.hbase.regionserver.RegionServerServices;
32  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
33  
34  /**
35   * A throughput controller which uses the follow schema to limit throughput
36   * <ul>
37   * <li>If compaction pressure is greater than 1.0, no limitation.</li>
38   * <li>In off peak hours, use a fixed throughput limitation
39   * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK}</li>
40   * <li>In normal hours, the max throughput is tune between
41   * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND} and
42   * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND}, using the formula &quot;lower +
43   * (higer - lower) * compactionPressure&quot;, where compactionPressure is in range [0.0, 1.0]</li>
44   * </ul>
45   * @see org.apache.hadoop.hbase.regionserver.Store#getCompactionPressure()
46   */
47  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
48  public class PressureAwareCompactionThroughputController extends Configured implements
49      CompactionThroughputController, Stoppable {
50  
51    private final static Log LOG = LogFactory
52        .getLog(PressureAwareCompactionThroughputController.class);
53  
54    public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND =
55        "hbase.hstore.compaction.throughput.higher.bound";
56  
57    private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND =
58        20L * 1024 * 1024;
59  
60    public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND =
61        "hbase.hstore.compaction.throughput.lower.bound";
62  
63    private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND =
64        10L * 1024 * 1024;
65  
66    public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK =
67        "hbase.hstore.compaction.throughput.offpeak";
68  
69    private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK = Long.MAX_VALUE;
70  
71    public static final String HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD =
72        "hbase.hstore.compaction.throughput.tune.period";
73  
74    private static final int DEFAULT_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD = 60 * 1000;
75  
76    /**
77     * Stores the information of one controlled compaction.
78     */
79    private static final class ActiveCompaction {
80  
81      private final long startTime;
82  
83      private long lastControlTime;
84  
85      private long lastControlSize;
86  
87      private long totalSize;
88  
89      private long numberOfSleeps;
90  
91      private long totalSleepTime;
92  
93      // prevent too many debug log
94      private long lastLogTime;
95  
96      ActiveCompaction() {
97        long currentTime = EnvironmentEdgeManager.currentTime();
98        this.startTime = currentTime;
99        this.lastControlTime = currentTime;
100       this.lastLogTime = currentTime;
101     }
102   }
103 
104   private long maxThroughputHigherBound;
105 
106   private long maxThroughputLowerBound;
107 
108   private long maxThroughputOffpeak;
109 
110   private OffPeakHours offPeakHours;
111 
112   private long controlPerSize;
113 
114   private int tuningPeriod;
115 
116   volatile double maxThroughput;
117 
118   private final ConcurrentMap<String, ActiveCompaction> activeCompactions =
119       new ConcurrentHashMap<String, ActiveCompaction>();
120 
121   @Override
122   public void setup(final RegionServerServices server) {
123     server.getChoreService().scheduleChore(
124       new ScheduledChore("CompactionThroughputTuner", this, tuningPeriod) {
125 
126         @Override
127         protected void chore() {
128           tune(server.getCompactionPressure());
129         }
130       });
131   }
132 
133   private void tune(double compactionPressure) {
134     double maxThroughputToSet;
135     if (compactionPressure > 1.0) {
136       // set to unlimited if some stores already reach the blocking store file count
137       maxThroughputToSet = Double.MAX_VALUE;
138     } else if (offPeakHours.isOffPeakHour()) {
139       maxThroughputToSet = maxThroughputOffpeak;
140     } else {
141       // compactionPressure is between 0.0 and 1.0, we use a simple linear formula to
142       // calculate the throughput limitation.
143       maxThroughputToSet =
144           maxThroughputLowerBound + (maxThroughputHigherBound - maxThroughputLowerBound)
145               * compactionPressure;
146     }
147     if (LOG.isDebugEnabled()) {
148       LOG.debug("compactionPressure is " + compactionPressure + ", tune compaction throughput to "
149           + throughputDesc(maxThroughputToSet));
150     }
151     this.maxThroughput = maxThroughputToSet;
152   }
153 
154   @Override
155   public void setConf(Configuration conf) {
156     super.setConf(conf);
157     if (conf == null) {
158       return;
159     }
160     this.maxThroughputHigherBound =
161         conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
162           DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND);
163     this.maxThroughputLowerBound =
164         conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
165           DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND);
166     this.maxThroughputOffpeak =
167         conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK,
168           DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK);
169     this.offPeakHours = OffPeakHours.getInstance(conf);
170     this.controlPerSize = this.maxThroughputLowerBound;
171     this.maxThroughput = this.maxThroughputLowerBound;
172     this.tuningPeriod =
173         getConf().getInt(HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD,
174           DEFAULT_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD);
175     LOG.info("Compaction throughput configurations, higher bound: "
176         + throughputDesc(maxThroughputHigherBound) + ", lower bound "
177         + throughputDesc(maxThroughputLowerBound) + ", off peak: "
178         + throughputDesc(maxThroughputOffpeak) + ", tuning period: " + tuningPeriod + " ms");
179   }
180 
181   private String throughputDesc(long deltaSize, long elapsedTime) {
182     return throughputDesc((double) deltaSize / elapsedTime * 1000);
183   }
184 
185   private String throughputDesc(double speed) {
186     if (speed >= 1E15) { // large enough to say it is unlimited
187       return "unlimited";
188     } else {
189       return String.format("%.2f MB/sec", speed / 1024 / 1024);
190     }
191   }
192 
193   @Override
194   public void start(String compactionName) {
195     activeCompactions.put(compactionName, new ActiveCompaction());
196   }
197 
198   @Override
199   public long control(String compactionName, long size) throws InterruptedException {
200     ActiveCompaction compaction = activeCompactions.get(compactionName);
201     compaction.totalSize += size;
202     long deltaSize = compaction.totalSize - compaction.lastControlSize;
203     if (deltaSize < controlPerSize) {
204       return 0;
205     }
206     long now = EnvironmentEdgeManager.currentTime();
207     double maxThroughputPerCompaction = this.maxThroughput / activeCompactions.size();
208     long minTimeAllowed = (long) (deltaSize / maxThroughputPerCompaction * 1000); // ms
209     long elapsedTime = now - compaction.lastControlTime;
210     compaction.lastControlSize = compaction.totalSize;
211     if (elapsedTime >= minTimeAllowed) {
212       compaction.lastControlTime = EnvironmentEdgeManager.currentTime();
213       return 0;
214     }
215     // too fast
216     long sleepTime = minTimeAllowed - elapsedTime;
217     if (LOG.isDebugEnabled()) {
218       // do not log too much
219       if (now - compaction.lastLogTime > 60L * 1000) {
220         LOG.debug(compactionName + " sleep " + sleepTime + " ms because current throughput is "
221             + throughputDesc(deltaSize, elapsedTime) + ", max allowed is "
222             + throughputDesc(maxThroughputPerCompaction) + ", already slept "
223             + compaction.numberOfSleeps + " time(s) and total slept time is "
224             + compaction.totalSleepTime + " ms till now.");
225         compaction.lastLogTime = now;
226       }
227     }
228     Thread.sleep(sleepTime);
229     compaction.numberOfSleeps++;
230     compaction.totalSleepTime += sleepTime;
231     compaction.lastControlTime = EnvironmentEdgeManager.currentTime();
232     return sleepTime;
233   }
234 
235   @Override
236   public void finish(String compactionName) {
237     ActiveCompaction compaction = activeCompactions.remove(compactionName);
238     long elapsedTime = Math.max(1, EnvironmentEdgeManager.currentTime() - compaction.startTime);
239     LOG.info(compactionName + " average throughput is "
240         + throughputDesc(compaction.totalSize, elapsedTime) + ", slept "
241         + compaction.numberOfSleeps + " time(s) and total slept time is "
242         + compaction.totalSleepTime + " ms. " + activeCompactions.size()
243         + " active compactions remaining, total limit is " + throughputDesc(maxThroughput));
244   }
245 
246   private volatile boolean stopped = false;
247 
248   @Override
249   public void stop(String why) {
250     stopped = true;
251   }
252 
253   @Override
254   public boolean isStopped() {
255     return stopped;
256   }
257 
258   @Override
259   public String toString() {
260     return "DefaultCompactionThroughputController [maxThroughput=" + throughputDesc(maxThroughput)
261         + ", activeCompactions=" + activeCompactions.size() + "]";
262   }
263 }