View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY;
22  
23  import java.lang.management.ManagementFactory;
24  import java.lang.management.MemoryUsage;
25  import java.util.concurrent.atomic.AtomicLong;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.ChoreService;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.ScheduledChore;
33  import org.apache.hadoop.hbase.Server;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.io.hfile.BlockCache;
36  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
37  import org.apache.hadoop.hbase.io.hfile.ResizableBlockCache;
38  import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
39  import org.apache.hadoop.util.ReflectionUtils;
40  
41  import com.google.common.annotations.VisibleForTesting;
42  
43  /**
44   * Manages tuning of Heap memory using <code>HeapMemoryTuner</code>.
45   */
46  @InterfaceAudience.Private
47  public class HeapMemoryManager {
48    private static final Log LOG = LogFactory.getLog(HeapMemoryManager.class);
49    private static final int CONVERT_TO_PERCENTAGE = 100;
50    private static final int CLUSTER_MINIMUM_MEMORY_THRESHOLD = 
51      (int) (CONVERT_TO_PERCENTAGE * HConstants.HBASE_CLUSTER_MINIMUM_MEMORY_THRESHOLD);
52  
53    public static final String BLOCK_CACHE_SIZE_MAX_RANGE_KEY = "hfile.block.cache.size.max.range";
54    public static final String BLOCK_CACHE_SIZE_MIN_RANGE_KEY = "hfile.block.cache.size.min.range";
55    public static final String MEMSTORE_SIZE_MAX_RANGE_KEY = 
56        "hbase.regionserver.global.memstore.size.max.range";
57    public static final String MEMSTORE_SIZE_MIN_RANGE_KEY = 
58        "hbase.regionserver.global.memstore.size.min.range";
59    public static final String HBASE_RS_HEAP_MEMORY_TUNER_PERIOD = 
60        "hbase.regionserver.heapmemory.tuner.period";
61    public static final int HBASE_RS_HEAP_MEMORY_TUNER_DEFAULT_PERIOD = 60 * 1000;
62    public static final String HBASE_RS_HEAP_MEMORY_TUNER_CLASS = 
63        "hbase.regionserver.heapmemory.tuner.class";
64  
65    private float globalMemStorePercent;
66    private float globalMemStorePercentMinRange;
67    private float globalMemStorePercentMaxRange;
68  
69    private float blockCachePercent;
70    private float blockCachePercentMinRange;
71    private float blockCachePercentMaxRange;
72    private float l2BlockCachePercent;
73  
74    private float heapOccupancyPercent;
75  
76    private final ResizableBlockCache blockCache;
77    private final FlushRequester memStoreFlusher;
78    private final Server server;
79  
80    private HeapMemoryTunerChore heapMemTunerChore = null;
81    private final boolean tunerOn;
82    private final int defaultChorePeriod;
83    private final float heapOccupancyLowWatermark;
84  
85    private long maxHeapSize = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
86  
87    public static HeapMemoryManager create(Configuration conf, FlushRequester memStoreFlusher,
88        Server server) {
89      BlockCache blockCache = CacheConfig.instantiateBlockCache(conf);
90      if (blockCache instanceof ResizableBlockCache) {
91        return new HeapMemoryManager((ResizableBlockCache) blockCache, memStoreFlusher, server);
92      }
93      return null;
94    }
95  
96    @VisibleForTesting
97    HeapMemoryManager(ResizableBlockCache blockCache, FlushRequester memStoreFlusher,
98        Server server) {
99      Configuration conf = server.getConfiguration();
100     this.blockCache = blockCache;
101     this.memStoreFlusher = memStoreFlusher;
102     this.server = server;
103     this.tunerOn = doInit(conf);
104     this.defaultChorePeriod = conf.getInt(HBASE_RS_HEAP_MEMORY_TUNER_PERIOD,
105       HBASE_RS_HEAP_MEMORY_TUNER_DEFAULT_PERIOD);
106     this.heapOccupancyLowWatermark = conf.getFloat(HConstants.HEAP_OCCUPANCY_LOW_WATERMARK_KEY,
107       HConstants.DEFAULT_HEAP_OCCUPANCY_LOW_WATERMARK);
108   }
109 
110   private boolean doInit(Configuration conf) {
111     boolean tuningEnabled = true;
112     globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, false);
113     blockCachePercent = conf.getFloat(HFILE_BLOCK_CACHE_SIZE_KEY,
114         HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
115     HeapMemorySizeUtil.checkForClusterFreeMemoryLimit(conf);
116     // Initialize max and min range for memstore heap space
117     globalMemStorePercentMinRange = conf.getFloat(MEMSTORE_SIZE_MIN_RANGE_KEY,
118         globalMemStorePercent);
119     globalMemStorePercentMaxRange = conf.getFloat(MEMSTORE_SIZE_MAX_RANGE_KEY,
120         globalMemStorePercent);
121     if (globalMemStorePercent < globalMemStorePercentMinRange) {
122       LOG.warn("Setting " + MEMSTORE_SIZE_MIN_RANGE_KEY + " to " + globalMemStorePercent
123           + ", same value as " + HeapMemorySizeUtil.MEMSTORE_SIZE_KEY
124           + " because supplied value greater than initial memstore size value.");
125       globalMemStorePercentMinRange = globalMemStorePercent;
126       conf.setFloat(MEMSTORE_SIZE_MIN_RANGE_KEY, globalMemStorePercentMinRange);
127     }
128     if (globalMemStorePercent > globalMemStorePercentMaxRange) {
129       LOG.warn("Setting " + MEMSTORE_SIZE_MAX_RANGE_KEY + " to " + globalMemStorePercent
130           + ", same value as " + HeapMemorySizeUtil.MEMSTORE_SIZE_KEY
131           + " because supplied value less than initial memstore size value.");
132       globalMemStorePercentMaxRange = globalMemStorePercent;
133       conf.setFloat(MEMSTORE_SIZE_MAX_RANGE_KEY, globalMemStorePercentMaxRange);
134     }
135     if (globalMemStorePercent == globalMemStorePercentMinRange
136         && globalMemStorePercent == globalMemStorePercentMaxRange) {
137       tuningEnabled = false;
138     }
139     // Initialize max and min range for block cache
140     blockCachePercentMinRange = conf.getFloat(BLOCK_CACHE_SIZE_MIN_RANGE_KEY, blockCachePercent);
141     blockCachePercentMaxRange = conf.getFloat(BLOCK_CACHE_SIZE_MAX_RANGE_KEY, blockCachePercent);
142     if (blockCachePercent < blockCachePercentMinRange) {
143       LOG.warn("Setting " + BLOCK_CACHE_SIZE_MIN_RANGE_KEY + " to " + blockCachePercent
144           + ", same value as " + HFILE_BLOCK_CACHE_SIZE_KEY
145           + " because supplied value greater than initial block cache size.");
146       blockCachePercentMinRange = blockCachePercent;
147       conf.setFloat(BLOCK_CACHE_SIZE_MIN_RANGE_KEY, blockCachePercentMinRange);
148     }
149     if (blockCachePercent > blockCachePercentMaxRange) {
150       LOG.warn("Setting " + BLOCK_CACHE_SIZE_MAX_RANGE_KEY + " to " + blockCachePercent
151           + ", same value as " + HFILE_BLOCK_CACHE_SIZE_KEY
152           + " because supplied value less than initial block cache size.");
153       blockCachePercentMaxRange = blockCachePercent;
154       conf.setFloat(BLOCK_CACHE_SIZE_MAX_RANGE_KEY, blockCachePercentMaxRange);
155     }
156     if (tuningEnabled && blockCachePercent == blockCachePercentMinRange
157         && blockCachePercent == blockCachePercentMaxRange) {
158       tuningEnabled = false;
159     }
160 
161     int gml = (int) (globalMemStorePercentMaxRange * CONVERT_TO_PERCENTAGE);
162     this.l2BlockCachePercent = HeapMemorySizeUtil.getL2BlockCacheHeapPercent(conf);
163     int bcul = (int) ((blockCachePercentMinRange + l2BlockCachePercent) * CONVERT_TO_PERCENTAGE);
164     if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
165       throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds "
166           + "the threshold required for successful cluster operation. "
167           + "The combined value cannot exceed 0.8. Please check the settings for "
168           + MEMSTORE_SIZE_MAX_RANGE_KEY + " and " + BLOCK_CACHE_SIZE_MIN_RANGE_KEY
169           + " in your configuration. " + MEMSTORE_SIZE_MAX_RANGE_KEY + " is "
170           + globalMemStorePercentMaxRange + " and " + BLOCK_CACHE_SIZE_MIN_RANGE_KEY + " is "
171           + blockCachePercentMinRange);
172     }
173     gml = (int) (globalMemStorePercentMinRange * CONVERT_TO_PERCENTAGE);
174     bcul = (int) ((blockCachePercentMaxRange + l2BlockCachePercent) * CONVERT_TO_PERCENTAGE);
175     if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
176       throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds "
177           + "the threshold required for successful cluster operation. "
178           + "The combined value cannot exceed 0.8. Please check the settings for "
179           + MEMSTORE_SIZE_MIN_RANGE_KEY + " and " + BLOCK_CACHE_SIZE_MAX_RANGE_KEY
180           + " in your configuration. " + MEMSTORE_SIZE_MIN_RANGE_KEY + " is "
181           + globalMemStorePercentMinRange + " and " + BLOCK_CACHE_SIZE_MAX_RANGE_KEY + " is "
182           + blockCachePercentMaxRange);
183     }
184     return tuningEnabled;
185   }
186 
187   public void start(ChoreService service) {
188       LOG.info("Starting HeapMemoryTuner chore.");
189       this.heapMemTunerChore = new HeapMemoryTunerChore();
190       service.scheduleChore(heapMemTunerChore);
191       if (tunerOn) {
192       // Register HeapMemoryTuner as a memstore flush listener
193       memStoreFlusher.registerFlushRequestListener(heapMemTunerChore);
194     }
195   }
196 
197   public void stop() {
198     // The thread is Daemon. Just interrupting the ongoing process.
199     LOG.info("Stoping HeapMemoryTuner chore.");
200     this.heapMemTunerChore.cancel(true);
201     
202   }
203 
204   // Used by the test cases.
205   boolean isTunerOn() {
206     return this.tunerOn;
207   }
208 
209   /**
210    * @return heap occupancy percentage, 0 <= n <= 1
211    */
212   public float getHeapOccupancyPercent() {
213     return this.heapOccupancyPercent;
214   }
215 
216   private class HeapMemoryTunerChore extends ScheduledChore implements FlushRequestListener {
217     private HeapMemoryTuner heapMemTuner;
218     private AtomicLong blockedFlushCount = new AtomicLong();
219     private AtomicLong unblockedFlushCount = new AtomicLong();
220     private long evictCount = 0L;
221     private TunerContext tunerContext = new TunerContext();
222     private boolean alarming = false;
223 
224     public HeapMemoryTunerChore() {
225       super(server.getServerName() + "-HeapMemoryTunerChore", server, defaultChorePeriod);
226       Class<? extends HeapMemoryTuner> tunerKlass = server.getConfiguration().getClass(
227           HBASE_RS_HEAP_MEMORY_TUNER_CLASS, DefaultHeapMemoryTuner.class, HeapMemoryTuner.class);
228       heapMemTuner = ReflectionUtils.newInstance(tunerKlass, server.getConfiguration());
229     }
230 
231     @Override
232     protected void chore() {
233       // Sample heap occupancy
234       MemoryUsage memUsage = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
235       heapOccupancyPercent = (float)memUsage.getUsed() / (float)memUsage.getCommitted();
236       // If we are above the heap occupancy alarm low watermark, switch to short
237       // sleeps for close monitoring. Stop autotuning, we are in a danger zone.
238       if (heapOccupancyPercent >= heapOccupancyLowWatermark) {
239         if (!alarming) {
240           LOG.warn("heapOccupancyPercent " + heapOccupancyPercent +
241             " is above heap occupancy alarm watermark (" + heapOccupancyLowWatermark + ")");
242           alarming = true;
243         }
244 
245         triggerNow();
246         try {
247           // Need to sleep ourselves since we've told the chore's sleeper
248           // to skip the next sleep cycle.
249           Thread.sleep(1000);
250         } catch (InterruptedException e) {
251           // Interrupted, propagate
252           Thread.currentThread().interrupt();
253         }
254       } else {
255         if (alarming) {
256           LOG.info("heapOccupancyPercent " + heapOccupancyPercent +
257             " is now below the heap occupancy alarm watermark (" +
258             heapOccupancyLowWatermark + ")");
259           alarming = false;
260         }
261       }
262       // Autotune if tuning is enabled and allowed
263       if (tunerOn && !alarming) {
264         tune();
265       }
266     }
267 
268     private void tune() {
269       long curEvictCount = blockCache.getStats().getEvictedCount();
270       tunerContext.setEvictCount(curEvictCount - evictCount);
271       evictCount = curEvictCount;
272       tunerContext.setBlockedFlushCount(blockedFlushCount.getAndSet(0));
273       tunerContext.setUnblockedFlushCount(unblockedFlushCount.getAndSet(0));
274       tunerContext.setCurBlockCacheSize(blockCachePercent);
275       tunerContext.setCurMemStoreSize(globalMemStorePercent);
276       TunerResult result = null;
277       try {
278         result = this.heapMemTuner.tune(tunerContext);
279       } catch (Throwable t) {
280         LOG.error("Exception thrown from the HeapMemoryTuner implementation", t);
281       }
282       if (result != null && result.needsTuning()) {
283         float memstoreSize = result.getMemstoreSize();
284         float blockCacheSize = result.getBlockCacheSize();
285         LOG.debug("From HeapMemoryTuner new memstoreSize: " + memstoreSize
286             + ". new blockCacheSize: " + blockCacheSize);
287         if (memstoreSize < globalMemStorePercentMinRange) {
288           LOG.info("New memstoreSize from HeapMemoryTuner " + memstoreSize + " is below min level "
289               + globalMemStorePercentMinRange + ". Resetting memstoreSize to min size");
290           memstoreSize = globalMemStorePercentMinRange;
291         } else if (memstoreSize > globalMemStorePercentMaxRange) {
292           LOG.info("New memstoreSize from HeapMemoryTuner " + memstoreSize + " is above max level "
293               + globalMemStorePercentMaxRange + ". Resetting memstoreSize to max size");
294           memstoreSize = globalMemStorePercentMaxRange;
295         }
296         if (blockCacheSize < blockCachePercentMinRange) {
297           LOG.info("New blockCacheSize from HeapMemoryTuner " + blockCacheSize
298               + " is below min level " + blockCachePercentMinRange
299               + ". Resetting blockCacheSize to min size");
300           blockCacheSize = blockCachePercentMinRange;
301         } else if (blockCacheSize > blockCachePercentMaxRange) {
302           LOG.info("New blockCacheSize from HeapMemoryTuner " + blockCacheSize
303               + " is above max level " + blockCachePercentMaxRange
304               + ". Resetting blockCacheSize to min size");
305           blockCacheSize = blockCachePercentMaxRange;
306         }
307         int gml = (int) (memstoreSize * CONVERT_TO_PERCENTAGE);
308         int bcul = (int) ((blockCacheSize + l2BlockCachePercent) * CONVERT_TO_PERCENTAGE);
309         if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
310           LOG.info("Current heap configuration from HeapMemoryTuner exceeds "
311               + "the threshold required for successful cluster operation. "
312               + "The combined value cannot exceed 0.8. " + HeapMemorySizeUtil.MEMSTORE_SIZE_KEY
313               + " is " + memstoreSize + " and " + HFILE_BLOCK_CACHE_SIZE_KEY + " is "
314               + blockCacheSize);
315           // TODO can adjust the value so as not exceed 80%. Is that correct? may be.
316         } else {
317           long newBlockCacheSize = (long) (maxHeapSize * blockCacheSize);
318           long newMemstoreSize = (long) (maxHeapSize * memstoreSize);
319           LOG.info("Setting block cache heap size to " + newBlockCacheSize
320               + " and memstore heap size to " + newMemstoreSize);
321           blockCachePercent = blockCacheSize;
322           blockCache.setMaxSize(newBlockCacheSize);
323           globalMemStorePercent = memstoreSize;
324           memStoreFlusher.setGlobalMemstoreLimit(newMemstoreSize);
325         }
326       }
327     }
328 
329     @Override
330     public void flushRequested(FlushType type, Region region) {
331       switch (type) {
332       case ABOVE_HIGHER_MARK:
333         blockedFlushCount.incrementAndGet();
334         break;
335       case ABOVE_LOWER_MARK:
336         unblockedFlushCount.incrementAndGet();
337         break;
338       default:
339         // In case of normal flush don't do any action.
340         break;
341       }
342     }
343   }
344 
345   /**
346    * POJO to pass all the relevant information required to do the heap memory tuning. It holds the
347    * flush counts and block cache evictions happened within the interval. Also holds the current
348    * heap percentage allocated for memstore and block cache.
349    */
350   public static final class TunerContext {
351     private long blockedFlushCount;
352     private long unblockedFlushCount;
353     private long evictCount;
354     private float curMemStoreSize;
355     private float curBlockCacheSize;
356 
357     public long getBlockedFlushCount() {
358       return blockedFlushCount;
359     }
360 
361     public void setBlockedFlushCount(long blockedFlushCount) {
362       this.blockedFlushCount = blockedFlushCount;
363     }
364 
365     public long getUnblockedFlushCount() {
366       return unblockedFlushCount;
367     }
368 
369     public void setUnblockedFlushCount(long unblockedFlushCount) {
370       this.unblockedFlushCount = unblockedFlushCount;
371     }
372 
373     public long getEvictCount() {
374       return evictCount;
375     }
376 
377     public void setEvictCount(long evictCount) {
378       this.evictCount = evictCount;
379     }
380 
381     public float getCurMemStoreSize() {
382       return curMemStoreSize;
383     }
384 
385     public void setCurMemStoreSize(float curMemStoreSize) {
386       this.curMemStoreSize = curMemStoreSize;
387     }
388 
389     public float getCurBlockCacheSize() {
390       return curBlockCacheSize;
391     }
392 
393     public void setCurBlockCacheSize(float curBlockCacheSize) {
394       this.curBlockCacheSize = curBlockCacheSize;
395     }
396   }
397 
398   /**
399    * POJO which holds the result of memory tuning done by HeapMemoryTuner implementation.
400    * It includes the new heap percentage for memstore and block cache.
401    */
402   public static final class TunerResult {
403     private float memstoreSize;
404     private float blockCacheSize;
405     private final boolean needsTuning;
406 
407     public TunerResult(boolean needsTuning) {
408       this.needsTuning = needsTuning;
409     }
410 
411     public float getMemstoreSize() {
412       return memstoreSize;
413     }
414 
415     public void setMemstoreSize(float memstoreSize) {
416       this.memstoreSize = memstoreSize;
417     }
418 
419     public float getBlockCacheSize() {
420       return blockCacheSize;
421     }
422 
423     public void setBlockCacheSize(float blockCacheSize) {
424       this.blockCacheSize = blockCacheSize;
425     }
426 
427     public boolean needsTuning() {
428       return needsTuning;
429     }
430   }
431 }