View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY;
22  
23  import java.lang.management.ManagementFactory;
24  import java.lang.management.MemoryUsage;
25  import java.util.concurrent.atomic.AtomicLong;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.ChoreService;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.ScheduledChore;
33  import org.apache.hadoop.hbase.Server;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.io.hfile.BlockCache;
36  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
37  import org.apache.hadoop.hbase.io.hfile.ResizableBlockCache;
38  import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
39  import org.apache.hadoop.util.ReflectionUtils;
40  
41  import com.google.common.annotations.VisibleForTesting;
42  
43  /**
44   * Manages tuning of Heap memory using <code>HeapMemoryTuner</code>.
45   */
46  @InterfaceAudience.Private
47  public class HeapMemoryManager {
48    private static final Log LOG = LogFactory.getLog(HeapMemoryManager.class);
49    private static final int CONVERT_TO_PERCENTAGE = 100;
50    private static final int CLUSTER_MINIMUM_MEMORY_THRESHOLD = 
51      (int) (CONVERT_TO_PERCENTAGE * HConstants.HBASE_CLUSTER_MINIMUM_MEMORY_THRESHOLD);
52  
53    public static final String BLOCK_CACHE_SIZE_MAX_RANGE_KEY = "hfile.block.cache.size.max.range";
54    public static final String BLOCK_CACHE_SIZE_MIN_RANGE_KEY = "hfile.block.cache.size.min.range";
55    public static final String MEMSTORE_SIZE_MAX_RANGE_KEY = 
56        "hbase.regionserver.global.memstore.size.max.range";
57    public static final String MEMSTORE_SIZE_MIN_RANGE_KEY = 
58        "hbase.regionserver.global.memstore.size.min.range";
59    public static final String HBASE_RS_HEAP_MEMORY_TUNER_PERIOD = 
60        "hbase.regionserver.heapmemory.tuner.period";
61    public static final int HBASE_RS_HEAP_MEMORY_TUNER_DEFAULT_PERIOD = 60 * 1000;
62    public static final String HBASE_RS_HEAP_MEMORY_TUNER_CLASS = 
63        "hbase.regionserver.heapmemory.tuner.class";
64  
65    private float globalMemStorePercent;
66    private float globalMemStorePercentMinRange;
67    private float globalMemStorePercentMaxRange;
68  
69    private float blockCachePercent;
70    private float blockCachePercentMinRange;
71    private float blockCachePercentMaxRange;
72    private float l2BlockCachePercent;
73  
74    private float heapOccupancyPercent;
75  
76    private final ResizableBlockCache blockCache;
77    private final FlushRequester memStoreFlusher;
78    private final Server server;
79    private final RegionServerAccounting regionServerAccounting;
80  
81    private HeapMemoryTunerChore heapMemTunerChore = null;
82    private final boolean tunerOn;
83    private final int defaultChorePeriod;
84    private final float heapOccupancyLowWatermark;
85  
86    private long maxHeapSize = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
87  
88    public static HeapMemoryManager create(Configuration conf, FlushRequester memStoreFlusher,
89                  Server server, RegionServerAccounting regionServerAccounting) {
90      BlockCache blockCache = CacheConfig.instantiateBlockCache(conf);
91      if (blockCache instanceof ResizableBlockCache) {
92        return new HeapMemoryManager((ResizableBlockCache) blockCache, memStoreFlusher, server,
93                   regionServerAccounting);
94      }
95      return null;
96    }
97  
98    @VisibleForTesting
99    HeapMemoryManager(ResizableBlockCache blockCache, FlushRequester memStoreFlusher,
100                 Server server, RegionServerAccounting regionServerAccounting) {
101     Configuration conf = server.getConfiguration();
102     this.blockCache = blockCache;
103     this.memStoreFlusher = memStoreFlusher;
104     this.server = server;
105     this.regionServerAccounting = regionServerAccounting;
106     this.tunerOn = doInit(conf);
107     this.defaultChorePeriod = conf.getInt(HBASE_RS_HEAP_MEMORY_TUNER_PERIOD,
108       HBASE_RS_HEAP_MEMORY_TUNER_DEFAULT_PERIOD);
109     this.heapOccupancyLowWatermark = conf.getFloat(HConstants.HEAP_OCCUPANCY_LOW_WATERMARK_KEY,
110       HConstants.DEFAULT_HEAP_OCCUPANCY_LOW_WATERMARK);
111   }
112 
113   private boolean doInit(Configuration conf) {
114     boolean tuningEnabled = true;
115     globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, false);
116     blockCachePercent = conf.getFloat(HFILE_BLOCK_CACHE_SIZE_KEY,
117         HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
118     HeapMemorySizeUtil.checkForClusterFreeMemoryLimit(conf);
119     // Initialize max and min range for memstore heap space
120     globalMemStorePercentMinRange = conf.getFloat(MEMSTORE_SIZE_MIN_RANGE_KEY,
121         globalMemStorePercent);
122     globalMemStorePercentMaxRange = conf.getFloat(MEMSTORE_SIZE_MAX_RANGE_KEY,
123         globalMemStorePercent);
124     if (globalMemStorePercent < globalMemStorePercentMinRange) {
125       LOG.warn("Setting " + MEMSTORE_SIZE_MIN_RANGE_KEY + " to " + globalMemStorePercent
126           + ", same value as " + HeapMemorySizeUtil.MEMSTORE_SIZE_KEY
127           + " because supplied value greater than initial memstore size value.");
128       globalMemStorePercentMinRange = globalMemStorePercent;
129       conf.setFloat(MEMSTORE_SIZE_MIN_RANGE_KEY, globalMemStorePercentMinRange);
130     }
131     if (globalMemStorePercent > globalMemStorePercentMaxRange) {
132       LOG.warn("Setting " + MEMSTORE_SIZE_MAX_RANGE_KEY + " to " + globalMemStorePercent
133           + ", same value as " + HeapMemorySizeUtil.MEMSTORE_SIZE_KEY
134           + " because supplied value less than initial memstore size value.");
135       globalMemStorePercentMaxRange = globalMemStorePercent;
136       conf.setFloat(MEMSTORE_SIZE_MAX_RANGE_KEY, globalMemStorePercentMaxRange);
137     }
138     if (globalMemStorePercent == globalMemStorePercentMinRange
139         && globalMemStorePercent == globalMemStorePercentMaxRange) {
140       tuningEnabled = false;
141     }
142     // Initialize max and min range for block cache
143     blockCachePercentMinRange = conf.getFloat(BLOCK_CACHE_SIZE_MIN_RANGE_KEY, blockCachePercent);
144     blockCachePercentMaxRange = conf.getFloat(BLOCK_CACHE_SIZE_MAX_RANGE_KEY, blockCachePercent);
145     if (blockCachePercent < blockCachePercentMinRange) {
146       LOG.warn("Setting " + BLOCK_CACHE_SIZE_MIN_RANGE_KEY + " to " + blockCachePercent
147           + ", same value as " + HFILE_BLOCK_CACHE_SIZE_KEY
148           + " because supplied value greater than initial block cache size.");
149       blockCachePercentMinRange = blockCachePercent;
150       conf.setFloat(BLOCK_CACHE_SIZE_MIN_RANGE_KEY, blockCachePercentMinRange);
151     }
152     if (blockCachePercent > blockCachePercentMaxRange) {
153       LOG.warn("Setting " + BLOCK_CACHE_SIZE_MAX_RANGE_KEY + " to " + blockCachePercent
154           + ", same value as " + HFILE_BLOCK_CACHE_SIZE_KEY
155           + " because supplied value less than initial block cache size.");
156       blockCachePercentMaxRange = blockCachePercent;
157       conf.setFloat(BLOCK_CACHE_SIZE_MAX_RANGE_KEY, blockCachePercentMaxRange);
158     }
159     if (tuningEnabled && blockCachePercent == blockCachePercentMinRange
160         && blockCachePercent == blockCachePercentMaxRange) {
161       tuningEnabled = false;
162     }
163 
164     int gml = (int) (globalMemStorePercentMaxRange * CONVERT_TO_PERCENTAGE);
165     this.l2BlockCachePercent = HeapMemorySizeUtil.getL2BlockCacheHeapPercent(conf);
166     int bcul = (int) ((blockCachePercentMinRange + l2BlockCachePercent) * CONVERT_TO_PERCENTAGE);
167     if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
168       throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds "
169           + "the threshold required for successful cluster operation. "
170           + "The combined value cannot exceed 0.8. Please check the settings for "
171           + MEMSTORE_SIZE_MAX_RANGE_KEY + " and " + BLOCK_CACHE_SIZE_MIN_RANGE_KEY
172           + " in your configuration. " + MEMSTORE_SIZE_MAX_RANGE_KEY + " is "
173           + globalMemStorePercentMaxRange + " and " + BLOCK_CACHE_SIZE_MIN_RANGE_KEY + " is "
174           + blockCachePercentMinRange);
175     }
176     gml = (int) (globalMemStorePercentMinRange * CONVERT_TO_PERCENTAGE);
177     bcul = (int) ((blockCachePercentMaxRange + l2BlockCachePercent) * CONVERT_TO_PERCENTAGE);
178     if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
179       throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds "
180           + "the threshold required for successful cluster operation. "
181           + "The combined value cannot exceed 0.8. Please check the settings for "
182           + MEMSTORE_SIZE_MIN_RANGE_KEY + " and " + BLOCK_CACHE_SIZE_MAX_RANGE_KEY
183           + " in your configuration. " + MEMSTORE_SIZE_MIN_RANGE_KEY + " is "
184           + globalMemStorePercentMinRange + " and " + BLOCK_CACHE_SIZE_MAX_RANGE_KEY + " is "
185           + blockCachePercentMaxRange);
186     }
187     return tuningEnabled;
188   }
189 
190   public void start(ChoreService service) {
191       LOG.info("Starting HeapMemoryTuner chore.");
192       this.heapMemTunerChore = new HeapMemoryTunerChore();
193       service.scheduleChore(heapMemTunerChore);
194       if (tunerOn) {
195       // Register HeapMemoryTuner as a memstore flush listener
196       memStoreFlusher.registerFlushRequestListener(heapMemTunerChore);
197     }
198   }
199 
200   public void stop() {
201     // The thread is Daemon. Just interrupting the ongoing process.
202     LOG.info("Stoping HeapMemoryTuner chore.");
203     this.heapMemTunerChore.cancel(true);
204     
205   }
206 
207   // Used by the test cases.
208   boolean isTunerOn() {
209     return this.tunerOn;
210   }
211 
212   /**
213    * @return heap occupancy percentage, 0 &lt;= n &lt;= 1
214    */
215   public float getHeapOccupancyPercent() {
216     return this.heapOccupancyPercent;
217   }
218 
219   private class HeapMemoryTunerChore extends ScheduledChore implements FlushRequestListener {
220     private HeapMemoryTuner heapMemTuner;
221     private AtomicLong blockedFlushCount = new AtomicLong();
222     private AtomicLong unblockedFlushCount = new AtomicLong();
223     private long evictCount = 0L;
224     private long cacheMissCount = 0L;
225     private TunerContext tunerContext = new TunerContext();
226     private boolean alarming = false;
227 
228     public HeapMemoryTunerChore() {
229       super(server.getServerName() + "-HeapMemoryTunerChore", server, defaultChorePeriod);
230       Class<? extends HeapMemoryTuner> tunerKlass = server.getConfiguration().getClass(
231           HBASE_RS_HEAP_MEMORY_TUNER_CLASS, DefaultHeapMemoryTuner.class, HeapMemoryTuner.class);
232       heapMemTuner = ReflectionUtils.newInstance(tunerKlass, server.getConfiguration());
233     }
234 
235     @Override
236     protected void chore() {
237       // Sample heap occupancy
238       MemoryUsage memUsage = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
239       heapOccupancyPercent = (float)memUsage.getUsed() / (float)memUsage.getCommitted();
240       // If we are above the heap occupancy alarm low watermark, switch to short
241       // sleeps for close monitoring. Stop autotuning, we are in a danger zone.
242       if (heapOccupancyPercent >= heapOccupancyLowWatermark) {
243         if (!alarming) {
244           LOG.warn("heapOccupancyPercent " + heapOccupancyPercent +
245             " is above heap occupancy alarm watermark (" + heapOccupancyLowWatermark + ")");
246           alarming = true;
247         }
248 
249         triggerNow();
250         try {
251           // Need to sleep ourselves since we've told the chore's sleeper
252           // to skip the next sleep cycle.
253           Thread.sleep(1000);
254         } catch (InterruptedException e) {
255           // Interrupted, propagate
256           Thread.currentThread().interrupt();
257         }
258       } else {
259         if (alarming) {
260           LOG.info("heapOccupancyPercent " + heapOccupancyPercent +
261             " is now below the heap occupancy alarm watermark (" +
262             heapOccupancyLowWatermark + ")");
263           alarming = false;
264         }
265       }
266       // Autotune if tuning is enabled and allowed
267       if (tunerOn && !alarming) {
268         tune();
269       }
270     }
271 
272     private void tune() {
273       // TODO check if we can increase the memory boundaries
274       // while remaining in the limits
275       long curEvictCount;
276       long curCacheMisCount;
277       curEvictCount = blockCache.getStats().getEvictedCount();
278       tunerContext.setEvictCount(curEvictCount - evictCount);
279       evictCount = curEvictCount;
280       curCacheMisCount = blockCache.getStats().getMissCachingCount();
281       tunerContext.setCacheMissCount(curCacheMisCount-cacheMissCount);
282       cacheMissCount = curCacheMisCount;
283       tunerContext.setBlockedFlushCount(blockedFlushCount.getAndSet(0));
284       tunerContext.setUnblockedFlushCount(unblockedFlushCount.getAndSet(0));
285       tunerContext.setCurBlockCacheUsed((float)blockCache.getCurrentSize() / maxHeapSize);
286       tunerContext.setCurMemStoreUsed(
287                  (float)regionServerAccounting.getGlobalMemstoreSize() / maxHeapSize);
288       tunerContext.setCurBlockCacheSize(blockCachePercent);
289       tunerContext.setCurMemStoreSize(globalMemStorePercent);
290       TunerResult result = null;
291       try {
292         result = this.heapMemTuner.tune(tunerContext);
293       } catch (Throwable t) {
294         LOG.error("Exception thrown from the HeapMemoryTuner implementation", t);
295       }
296       if (result != null && result.needsTuning()) {
297         float memstoreSize = result.getMemstoreSize();
298         float blockCacheSize = result.getBlockCacheSize();
299         LOG.debug("From HeapMemoryTuner new memstoreSize: " + memstoreSize
300             + ". new blockCacheSize: " + blockCacheSize);
301         if (memstoreSize < globalMemStorePercentMinRange) {
302           LOG.info("New memstoreSize from HeapMemoryTuner " + memstoreSize + " is below min level "
303               + globalMemStorePercentMinRange + ". Resetting memstoreSize to min size");
304           memstoreSize = globalMemStorePercentMinRange;
305         } else if (memstoreSize > globalMemStorePercentMaxRange) {
306           LOG.info("New memstoreSize from HeapMemoryTuner " + memstoreSize + " is above max level "
307               + globalMemStorePercentMaxRange + ". Resetting memstoreSize to max size");
308           memstoreSize = globalMemStorePercentMaxRange;
309         }
310         if (blockCacheSize < blockCachePercentMinRange) {
311           LOG.info("New blockCacheSize from HeapMemoryTuner " + blockCacheSize
312               + " is below min level " + blockCachePercentMinRange
313               + ". Resetting blockCacheSize to min size");
314           blockCacheSize = blockCachePercentMinRange;
315         } else if (blockCacheSize > blockCachePercentMaxRange) {
316           LOG.info("New blockCacheSize from HeapMemoryTuner " + blockCacheSize
317               + " is above max level " + blockCachePercentMaxRange
318               + ". Resetting blockCacheSize to min size");
319           blockCacheSize = blockCachePercentMaxRange;
320         }
321         int gml = (int) (memstoreSize * CONVERT_TO_PERCENTAGE);
322         int bcul = (int) ((blockCacheSize + l2BlockCachePercent) * CONVERT_TO_PERCENTAGE);
323         if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
324           LOG.info("Current heap configuration from HeapMemoryTuner exceeds "
325               + "the threshold required for successful cluster operation. "
326               + "The combined value cannot exceed 0.8. " + HeapMemorySizeUtil.MEMSTORE_SIZE_KEY
327               + " is " + memstoreSize + " and " + HFILE_BLOCK_CACHE_SIZE_KEY + " is "
328               + blockCacheSize);
329           // TODO can adjust the value so as not exceed 80%. Is that correct? may be.
330         } else {
331           long newBlockCacheSize = (long) (maxHeapSize * blockCacheSize);
332           long newMemstoreSize = (long) (maxHeapSize * memstoreSize);
333           LOG.info("Setting block cache heap size to " + newBlockCacheSize
334               + " and memstore heap size to " + newMemstoreSize);
335           blockCachePercent = blockCacheSize;
336           blockCache.setMaxSize(newBlockCacheSize);
337           globalMemStorePercent = memstoreSize;
338           memStoreFlusher.setGlobalMemstoreLimit(newMemstoreSize);
339         }
340       } else if (LOG.isDebugEnabled()) {
341         LOG.debug("No changes made by HeapMemoryTuner.");
342       }
343     }
344 
345     @Override
346     public void flushRequested(FlushType type, Region region) {
347       switch (type) {
348       case ABOVE_HIGHER_MARK:
349         blockedFlushCount.incrementAndGet();
350         break;
351       case ABOVE_LOWER_MARK:
352         unblockedFlushCount.incrementAndGet();
353         break;
354       default:
355         // In case of normal flush don't do any action.
356         break;
357       }
358     }
359   }
360 
361   /**
362    * POJO to pass all the relevant information required to do the heap memory tuning. It holds the
363    * flush counts and block cache evictions happened within the interval. Also holds the current
364    * heap percentage allocated for memstore and block cache.
365    */
366   public static final class TunerContext {
367     private long blockedFlushCount;
368     private long unblockedFlushCount;
369     private long evictCount;
370     private long cacheMissCount;
371     private float curBlockCacheUsed;
372     private float curMemStoreUsed;
373     private float curMemStoreSize;
374     private float curBlockCacheSize;
375 
376     public long getBlockedFlushCount() {
377       return blockedFlushCount;
378     }
379 
380     public void setBlockedFlushCount(long blockedFlushCount) {
381       this.blockedFlushCount = blockedFlushCount;
382     }
383 
384     public long getUnblockedFlushCount() {
385       return unblockedFlushCount;
386     }
387 
388     public void setUnblockedFlushCount(long unblockedFlushCount) {
389       this.unblockedFlushCount = unblockedFlushCount;
390     }
391 
392     public long getEvictCount() {
393       return evictCount;
394     }
395 
396     public void setEvictCount(long evictCount) {
397       this.evictCount = evictCount;
398     }
399 
400     public float getCurMemStoreSize() {
401       return curMemStoreSize;
402     }
403 
404     public void setCurMemStoreSize(float curMemStoreSize) {
405       this.curMemStoreSize = curMemStoreSize;
406     }
407 
408     public float getCurBlockCacheSize() {
409       return curBlockCacheSize;
410     }
411 
412     public void setCurBlockCacheSize(float curBlockCacheSize) {
413       this.curBlockCacheSize = curBlockCacheSize;
414     }
415 
416     public long getCacheMissCount() {
417       return cacheMissCount;
418     }
419 
420     public void setCacheMissCount(long cacheMissCount) {
421       this.cacheMissCount = cacheMissCount;
422     }
423 
424     public float getCurBlockCacheUsed() {
425       return curBlockCacheUsed;
426     }
427 
428     public void setCurBlockCacheUsed(float curBlockCacheUsed) {
429       this.curBlockCacheUsed = curBlockCacheUsed;
430     }
431 
432     public float getCurMemStoreUsed() {
433       return curMemStoreUsed;
434     }
435 
436     public void setCurMemStoreUsed(float d) {
437         this.curMemStoreUsed = d;
438     }
439   }
440 
441   /**
442    * POJO which holds the result of memory tuning done by HeapMemoryTuner implementation.
443    * It includes the new heap percentage for memstore and block cache.
444    */
445   public static final class TunerResult {
446     private float memstoreSize;
447     private float blockCacheSize;
448     private final boolean needsTuning;
449 
450     public TunerResult(boolean needsTuning) {
451       this.needsTuning = needsTuning;
452     }
453 
454     public float getMemstoreSize() {
455       return memstoreSize;
456     }
457 
458     public void setMemstoreSize(float memstoreSize) {
459       this.memstoreSize = memstoreSize;
460     }
461 
462     public float getBlockCacheSize() {
463       return blockCacheSize;
464     }
465 
466     public void setBlockCacheSize(float blockCacheSize) {
467       this.blockCacheSize = blockCacheSize;
468     }
469 
470     public boolean needsTuning() {
471       return needsTuning;
472     }
473   }
474 }