1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY;
22
23 import java.lang.management.ManagementFactory;
24 import java.lang.management.MemoryUsage;
25 import java.util.concurrent.atomic.AtomicLong;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.ChoreService;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.ScheduledChore;
33 import org.apache.hadoop.hbase.Server;
34 import org.apache.hadoop.hbase.classification.InterfaceAudience;
35 import org.apache.hadoop.hbase.io.hfile.BlockCache;
36 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
37 import org.apache.hadoop.hbase.io.hfile.ResizableBlockCache;
38 import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
39 import org.apache.hadoop.util.ReflectionUtils;
40
41 import com.google.common.annotations.VisibleForTesting;
42
43
44
45
46 @InterfaceAudience.Private
47 public class HeapMemoryManager {
48 private static final Log LOG = LogFactory.getLog(HeapMemoryManager.class);
49 private static final int CONVERT_TO_PERCENTAGE = 100;
50 private static final int CLUSTER_MINIMUM_MEMORY_THRESHOLD =
51 (int) (CONVERT_TO_PERCENTAGE * HConstants.HBASE_CLUSTER_MINIMUM_MEMORY_THRESHOLD);
52
53 public static final String BLOCK_CACHE_SIZE_MAX_RANGE_KEY = "hfile.block.cache.size.max.range";
54 public static final String BLOCK_CACHE_SIZE_MIN_RANGE_KEY = "hfile.block.cache.size.min.range";
55 public static final String MEMSTORE_SIZE_MAX_RANGE_KEY =
56 "hbase.regionserver.global.memstore.size.max.range";
57 public static final String MEMSTORE_SIZE_MIN_RANGE_KEY =
58 "hbase.regionserver.global.memstore.size.min.range";
59 public static final String HBASE_RS_HEAP_MEMORY_TUNER_PERIOD =
60 "hbase.regionserver.heapmemory.tuner.period";
61 public static final int HBASE_RS_HEAP_MEMORY_TUNER_DEFAULT_PERIOD = 60 * 1000;
62 public static final String HBASE_RS_HEAP_MEMORY_TUNER_CLASS =
63 "hbase.regionserver.heapmemory.tuner.class";
64
65 public static final float HEAP_OCCUPANCY_ERROR_VALUE = -0.0f;
66
67 private float globalMemStorePercent;
68 private float globalMemStorePercentMinRange;
69 private float globalMemStorePercentMaxRange;
70
71 private float blockCachePercent;
72 private float blockCachePercentMinRange;
73 private float blockCachePercentMaxRange;
74 private float l2BlockCachePercent;
75
76 private float heapOccupancyPercent;
77
78 private final ResizableBlockCache blockCache;
79 private final FlushRequester memStoreFlusher;
80 private final Server server;
81 private final RegionServerAccounting regionServerAccounting;
82
83 private HeapMemoryTunerChore heapMemTunerChore = null;
84 private final boolean tunerOn;
85 private final int defaultChorePeriod;
86 private final float heapOccupancyLowWatermark;
87
88 private final long maxHeapSize;
89 {
90
91 long tempMaxHeap = -1L;
92 try {
93 final MemoryUsage usage = HeapMemorySizeUtil.safeGetHeapMemoryUsage();
94 if (usage != null) {
95 tempMaxHeap = usage.getMax();
96 }
97 } finally {
98 maxHeapSize = tempMaxHeap;
99 }
100 }
101
102 public static HeapMemoryManager create(Configuration conf, FlushRequester memStoreFlusher,
103 Server server, RegionServerAccounting regionServerAccounting) {
104 BlockCache blockCache = CacheConfig.instantiateBlockCache(conf);
105 if (blockCache instanceof ResizableBlockCache) {
106 return new HeapMemoryManager((ResizableBlockCache) blockCache, memStoreFlusher, server,
107 regionServerAccounting);
108 }
109 return null;
110 }
111
112 @VisibleForTesting
113 HeapMemoryManager(ResizableBlockCache blockCache, FlushRequester memStoreFlusher,
114 Server server, RegionServerAccounting regionServerAccounting) {
115 Configuration conf = server.getConfiguration();
116 this.blockCache = blockCache;
117 this.memStoreFlusher = memStoreFlusher;
118 this.server = server;
119 this.regionServerAccounting = regionServerAccounting;
120 this.tunerOn = doInit(conf);
121 this.defaultChorePeriod = conf.getInt(HBASE_RS_HEAP_MEMORY_TUNER_PERIOD,
122 HBASE_RS_HEAP_MEMORY_TUNER_DEFAULT_PERIOD);
123 this.heapOccupancyLowWatermark = conf.getFloat(HConstants.HEAP_OCCUPANCY_LOW_WATERMARK_KEY,
124 HConstants.DEFAULT_HEAP_OCCUPANCY_LOW_WATERMARK);
125 }
126
127 private boolean doInit(Configuration conf) {
128 boolean tuningEnabled = true;
129 globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, false);
130 blockCachePercent = conf.getFloat(HFILE_BLOCK_CACHE_SIZE_KEY,
131 HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
132 HeapMemorySizeUtil.checkForClusterFreeMemoryLimit(conf);
133
134 globalMemStorePercentMinRange = conf.getFloat(MEMSTORE_SIZE_MIN_RANGE_KEY,
135 globalMemStorePercent);
136 globalMemStorePercentMaxRange = conf.getFloat(MEMSTORE_SIZE_MAX_RANGE_KEY,
137 globalMemStorePercent);
138 if (globalMemStorePercent < globalMemStorePercentMinRange) {
139 LOG.warn("Setting " + MEMSTORE_SIZE_MIN_RANGE_KEY + " to " + globalMemStorePercent
140 + ", same value as " + HeapMemorySizeUtil.MEMSTORE_SIZE_KEY
141 + " because supplied value greater than initial memstore size value.");
142 globalMemStorePercentMinRange = globalMemStorePercent;
143 conf.setFloat(MEMSTORE_SIZE_MIN_RANGE_KEY, globalMemStorePercentMinRange);
144 }
145 if (globalMemStorePercent > globalMemStorePercentMaxRange) {
146 LOG.warn("Setting " + MEMSTORE_SIZE_MAX_RANGE_KEY + " to " + globalMemStorePercent
147 + ", same value as " + HeapMemorySizeUtil.MEMSTORE_SIZE_KEY
148 + " because supplied value less than initial memstore size value.");
149 globalMemStorePercentMaxRange = globalMemStorePercent;
150 conf.setFloat(MEMSTORE_SIZE_MAX_RANGE_KEY, globalMemStorePercentMaxRange);
151 }
152 if (globalMemStorePercent == globalMemStorePercentMinRange
153 && globalMemStorePercent == globalMemStorePercentMaxRange) {
154 tuningEnabled = false;
155 }
156
157 blockCachePercentMinRange = conf.getFloat(BLOCK_CACHE_SIZE_MIN_RANGE_KEY, blockCachePercent);
158 blockCachePercentMaxRange = conf.getFloat(BLOCK_CACHE_SIZE_MAX_RANGE_KEY, blockCachePercent);
159 if (blockCachePercent < blockCachePercentMinRange) {
160 LOG.warn("Setting " + BLOCK_CACHE_SIZE_MIN_RANGE_KEY + " to " + blockCachePercent
161 + ", same value as " + HFILE_BLOCK_CACHE_SIZE_KEY
162 + " because supplied value greater than initial block cache size.");
163 blockCachePercentMinRange = blockCachePercent;
164 conf.setFloat(BLOCK_CACHE_SIZE_MIN_RANGE_KEY, blockCachePercentMinRange);
165 }
166 if (blockCachePercent > blockCachePercentMaxRange) {
167 LOG.warn("Setting " + BLOCK_CACHE_SIZE_MAX_RANGE_KEY + " to " + blockCachePercent
168 + ", same value as " + HFILE_BLOCK_CACHE_SIZE_KEY
169 + " because supplied value less than initial block cache size.");
170 blockCachePercentMaxRange = blockCachePercent;
171 conf.setFloat(BLOCK_CACHE_SIZE_MAX_RANGE_KEY, blockCachePercentMaxRange);
172 }
173 if (tuningEnabled && blockCachePercent == blockCachePercentMinRange
174 && blockCachePercent == blockCachePercentMaxRange) {
175 tuningEnabled = false;
176 }
177
178 int gml = (int) (globalMemStorePercentMaxRange * CONVERT_TO_PERCENTAGE);
179 this.l2BlockCachePercent = HeapMemorySizeUtil.getL2BlockCacheHeapPercent(conf);
180 int bcul = (int) ((blockCachePercentMinRange + l2BlockCachePercent) * CONVERT_TO_PERCENTAGE);
181 if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
182 throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds "
183 + "the threshold required for successful cluster operation. "
184 + "The combined value cannot exceed 0.8. Please check the settings for "
185 + MEMSTORE_SIZE_MAX_RANGE_KEY + " and " + BLOCK_CACHE_SIZE_MIN_RANGE_KEY
186 + " in your configuration. " + MEMSTORE_SIZE_MAX_RANGE_KEY + " is "
187 + globalMemStorePercentMaxRange + " and " + BLOCK_CACHE_SIZE_MIN_RANGE_KEY + " is "
188 + blockCachePercentMinRange);
189 }
190 gml = (int) (globalMemStorePercentMinRange * CONVERT_TO_PERCENTAGE);
191 bcul = (int) ((blockCachePercentMaxRange + l2BlockCachePercent) * CONVERT_TO_PERCENTAGE);
192 if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
193 throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds "
194 + "the threshold required for successful cluster operation. "
195 + "The combined value cannot exceed 0.8. Please check the settings for "
196 + MEMSTORE_SIZE_MIN_RANGE_KEY + " and " + BLOCK_CACHE_SIZE_MAX_RANGE_KEY
197 + " in your configuration. " + MEMSTORE_SIZE_MIN_RANGE_KEY + " is "
198 + globalMemStorePercentMinRange + " and " + BLOCK_CACHE_SIZE_MAX_RANGE_KEY + " is "
199 + blockCachePercentMaxRange);
200 }
201 return tuningEnabled;
202 }
203
204 public void start(ChoreService service) {
205 LOG.info("Starting HeapMemoryTuner chore.");
206 this.heapMemTunerChore = new HeapMemoryTunerChore();
207 service.scheduleChore(heapMemTunerChore);
208 if (tunerOn) {
209
210 memStoreFlusher.registerFlushRequestListener(heapMemTunerChore);
211 }
212 }
213
214 public void stop() {
215
216 LOG.info("Stoping HeapMemoryTuner chore.");
217 this.heapMemTunerChore.cancel(true);
218
219 }
220
221
222 boolean isTunerOn() {
223 return this.tunerOn;
224 }
225
226
227
228
229 public float getHeapOccupancyPercent() {
230 return this.heapOccupancyPercent == Float.MAX_VALUE ? HEAP_OCCUPANCY_ERROR_VALUE : this.heapOccupancyPercent;
231 }
232
233 private class HeapMemoryTunerChore extends ScheduledChore implements FlushRequestListener {
234 private HeapMemoryTuner heapMemTuner;
235 private AtomicLong blockedFlushCount = new AtomicLong();
236 private AtomicLong unblockedFlushCount = new AtomicLong();
237 private long evictCount = 0L;
238 private long cacheMissCount = 0L;
239 private TunerContext tunerContext = new TunerContext();
240 private boolean alarming = false;
241
242 public HeapMemoryTunerChore() {
243 super(server.getServerName() + "-HeapMemoryTunerChore", server, defaultChorePeriod);
244 Class<? extends HeapMemoryTuner> tunerKlass = server.getConfiguration().getClass(
245 HBASE_RS_HEAP_MEMORY_TUNER_CLASS, DefaultHeapMemoryTuner.class, HeapMemoryTuner.class);
246 heapMemTuner = ReflectionUtils.newInstance(tunerKlass, server.getConfiguration());
247 }
248
249 @Override
250 protected void chore() {
251
252 final MemoryUsage usage = HeapMemorySizeUtil.safeGetHeapMemoryUsage();
253 if (usage != null) {
254 heapOccupancyPercent = (float)usage.getUsed() / (float)usage.getCommitted();
255 } else {
256
257
258
259 heapOccupancyPercent = Float.MAX_VALUE;
260 }
261
262
263 if (heapOccupancyPercent >= heapOccupancyLowWatermark) {
264 if (!alarming) {
265 LOG.warn("heapOccupancyPercent " + heapOccupancyPercent +
266 " is above heap occupancy alarm watermark (" + heapOccupancyLowWatermark + ")");
267 alarming = true;
268 }
269
270 triggerNow();
271 try {
272
273
274 Thread.sleep(1000);
275 } catch (InterruptedException e) {
276
277 Thread.currentThread().interrupt();
278 }
279 } else {
280 if (alarming) {
281 LOG.info("heapOccupancyPercent " + heapOccupancyPercent +
282 " is now below the heap occupancy alarm watermark (" +
283 heapOccupancyLowWatermark + ")");
284 alarming = false;
285 }
286 }
287
288 if (tunerOn && !alarming) {
289 tune();
290 }
291 }
292
293 private void tune() {
294
295
296 long curEvictCount;
297 long curCacheMisCount;
298 curEvictCount = blockCache.getStats().getEvictedCount();
299 tunerContext.setEvictCount(curEvictCount - evictCount);
300 evictCount = curEvictCount;
301 curCacheMisCount = blockCache.getStats().getMissCachingCount();
302 tunerContext.setCacheMissCount(curCacheMisCount-cacheMissCount);
303 cacheMissCount = curCacheMisCount;
304 tunerContext.setBlockedFlushCount(blockedFlushCount.getAndSet(0));
305 tunerContext.setUnblockedFlushCount(unblockedFlushCount.getAndSet(0));
306 tunerContext.setCurBlockCacheUsed((float)blockCache.getCurrentSize() / maxHeapSize);
307 tunerContext.setCurMemStoreUsed(
308 (float)regionServerAccounting.getGlobalMemstoreSize() / maxHeapSize);
309 tunerContext.setCurBlockCacheSize(blockCachePercent);
310 tunerContext.setCurMemStoreSize(globalMemStorePercent);
311 TunerResult result = null;
312 try {
313 result = this.heapMemTuner.tune(tunerContext);
314 } catch (Throwable t) {
315 LOG.error("Exception thrown from the HeapMemoryTuner implementation", t);
316 }
317 if (result != null && result.needsTuning()) {
318 float memstoreSize = result.getMemstoreSize();
319 float blockCacheSize = result.getBlockCacheSize();
320 LOG.debug("From HeapMemoryTuner new memstoreSize: " + memstoreSize
321 + ". new blockCacheSize: " + blockCacheSize);
322 if (memstoreSize < globalMemStorePercentMinRange) {
323 LOG.info("New memstoreSize from HeapMemoryTuner " + memstoreSize + " is below min level "
324 + globalMemStorePercentMinRange + ". Resetting memstoreSize to min size");
325 memstoreSize = globalMemStorePercentMinRange;
326 } else if (memstoreSize > globalMemStorePercentMaxRange) {
327 LOG.info("New memstoreSize from HeapMemoryTuner " + memstoreSize + " is above max level "
328 + globalMemStorePercentMaxRange + ". Resetting memstoreSize to max size");
329 memstoreSize = globalMemStorePercentMaxRange;
330 }
331 if (blockCacheSize < blockCachePercentMinRange) {
332 LOG.info("New blockCacheSize from HeapMemoryTuner " + blockCacheSize
333 + " is below min level " + blockCachePercentMinRange
334 + ". Resetting blockCacheSize to min size");
335 blockCacheSize = blockCachePercentMinRange;
336 } else if (blockCacheSize > blockCachePercentMaxRange) {
337 LOG.info("New blockCacheSize from HeapMemoryTuner " + blockCacheSize
338 + " is above max level " + blockCachePercentMaxRange
339 + ". Resetting blockCacheSize to min size");
340 blockCacheSize = blockCachePercentMaxRange;
341 }
342 int gml = (int) (memstoreSize * CONVERT_TO_PERCENTAGE);
343 int bcul = (int) ((blockCacheSize + l2BlockCachePercent) * CONVERT_TO_PERCENTAGE);
344 if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
345 LOG.info("Current heap configuration from HeapMemoryTuner exceeds "
346 + "the threshold required for successful cluster operation. "
347 + "The combined value cannot exceed 0.8. " + HeapMemorySizeUtil.MEMSTORE_SIZE_KEY
348 + " is " + memstoreSize + " and " + HFILE_BLOCK_CACHE_SIZE_KEY + " is "
349 + blockCacheSize);
350
351 } else {
352 long newBlockCacheSize = (long) (maxHeapSize * blockCacheSize);
353 long newMemstoreSize = (long) (maxHeapSize * memstoreSize);
354 LOG.info("Setting block cache heap size to " + newBlockCacheSize
355 + " and memstore heap size to " + newMemstoreSize);
356 blockCachePercent = blockCacheSize;
357 blockCache.setMaxSize(newBlockCacheSize);
358 globalMemStorePercent = memstoreSize;
359 memStoreFlusher.setGlobalMemstoreLimit(newMemstoreSize);
360 }
361 } else if (LOG.isDebugEnabled()) {
362 LOG.debug("No changes made by HeapMemoryTuner.");
363 }
364 }
365
366 @Override
367 public void flushRequested(FlushType type, Region region) {
368 switch (type) {
369 case ABOVE_HIGHER_MARK:
370 blockedFlushCount.incrementAndGet();
371 break;
372 case ABOVE_LOWER_MARK:
373 unblockedFlushCount.incrementAndGet();
374 break;
375 default:
376
377 break;
378 }
379 }
380 }
381
382
383
384
385
386
387 public static final class TunerContext {
388 private long blockedFlushCount;
389 private long unblockedFlushCount;
390 private long evictCount;
391 private long cacheMissCount;
392 private float curBlockCacheUsed;
393 private float curMemStoreUsed;
394 private float curMemStoreSize;
395 private float curBlockCacheSize;
396
397 public long getBlockedFlushCount() {
398 return blockedFlushCount;
399 }
400
401 public void setBlockedFlushCount(long blockedFlushCount) {
402 this.blockedFlushCount = blockedFlushCount;
403 }
404
405 public long getUnblockedFlushCount() {
406 return unblockedFlushCount;
407 }
408
409 public void setUnblockedFlushCount(long unblockedFlushCount) {
410 this.unblockedFlushCount = unblockedFlushCount;
411 }
412
413 public long getEvictCount() {
414 return evictCount;
415 }
416
417 public void setEvictCount(long evictCount) {
418 this.evictCount = evictCount;
419 }
420
421 public float getCurMemStoreSize() {
422 return curMemStoreSize;
423 }
424
425 public void setCurMemStoreSize(float curMemStoreSize) {
426 this.curMemStoreSize = curMemStoreSize;
427 }
428
429 public float getCurBlockCacheSize() {
430 return curBlockCacheSize;
431 }
432
433 public void setCurBlockCacheSize(float curBlockCacheSize) {
434 this.curBlockCacheSize = curBlockCacheSize;
435 }
436
437 public long getCacheMissCount() {
438 return cacheMissCount;
439 }
440
441 public void setCacheMissCount(long cacheMissCount) {
442 this.cacheMissCount = cacheMissCount;
443 }
444
445 public float getCurBlockCacheUsed() {
446 return curBlockCacheUsed;
447 }
448
449 public void setCurBlockCacheUsed(float curBlockCacheUsed) {
450 this.curBlockCacheUsed = curBlockCacheUsed;
451 }
452
453 public float getCurMemStoreUsed() {
454 return curMemStoreUsed;
455 }
456
457 public void setCurMemStoreUsed(float d) {
458 this.curMemStoreUsed = d;
459 }
460 }
461
462
463
464
465
466 public static final class TunerResult {
467 private float memstoreSize;
468 private float blockCacheSize;
469 private final boolean needsTuning;
470
471 public TunerResult(boolean needsTuning) {
472 this.needsTuning = needsTuning;
473 }
474
475 public float getMemstoreSize() {
476 return memstoreSize;
477 }
478
479 public void setMemstoreSize(float memstoreSize) {
480 this.memstoreSize = memstoreSize;
481 }
482
483 public float getBlockCacheSize() {
484 return blockCacheSize;
485 }
486
487 public void setBlockCacheSize(float blockCacheSize) {
488 this.blockCacheSize = blockCacheSize;
489 }
490
491 public boolean needsTuning() {
492 return needsTuning;
493 }
494 }
495 }