1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY;
22
23 import java.lang.management.ManagementFactory;
24 import java.lang.management.MemoryUsage;
25 import java.util.concurrent.atomic.AtomicLong;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.ChoreService;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.ScheduledChore;
33 import org.apache.hadoop.hbase.Server;
34 import org.apache.hadoop.hbase.classification.InterfaceAudience;
35 import org.apache.hadoop.hbase.io.hfile.BlockCache;
36 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
37 import org.apache.hadoop.hbase.io.hfile.ResizableBlockCache;
38 import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
39 import org.apache.hadoop.util.ReflectionUtils;
40
41 import com.google.common.annotations.VisibleForTesting;
42
43
44
45
46 @InterfaceAudience.Private
47 public class HeapMemoryManager {
48 private static final Log LOG = LogFactory.getLog(HeapMemoryManager.class);
49 private static final int CONVERT_TO_PERCENTAGE = 100;
50 private static final int CLUSTER_MINIMUM_MEMORY_THRESHOLD =
51 (int) (CONVERT_TO_PERCENTAGE * HConstants.HBASE_CLUSTER_MINIMUM_MEMORY_THRESHOLD);
52
53 public static final String BLOCK_CACHE_SIZE_MAX_RANGE_KEY = "hfile.block.cache.size.max.range";
54 public static final String BLOCK_CACHE_SIZE_MIN_RANGE_KEY = "hfile.block.cache.size.min.range";
55 public static final String MEMSTORE_SIZE_MAX_RANGE_KEY =
56 "hbase.regionserver.global.memstore.size.max.range";
57 public static final String MEMSTORE_SIZE_MIN_RANGE_KEY =
58 "hbase.regionserver.global.memstore.size.min.range";
59 public static final String HBASE_RS_HEAP_MEMORY_TUNER_PERIOD =
60 "hbase.regionserver.heapmemory.tuner.period";
61 public static final int HBASE_RS_HEAP_MEMORY_TUNER_DEFAULT_PERIOD = 60 * 1000;
62 public static final String HBASE_RS_HEAP_MEMORY_TUNER_CLASS =
63 "hbase.regionserver.heapmemory.tuner.class";
64
65 private float globalMemStorePercent;
66 private float globalMemStorePercentMinRange;
67 private float globalMemStorePercentMaxRange;
68
69 private float blockCachePercent;
70 private float blockCachePercentMinRange;
71 private float blockCachePercentMaxRange;
72 private float l2BlockCachePercent;
73
74 private float heapOccupancyPercent;
75
76 private final ResizableBlockCache blockCache;
77 private final FlushRequester memStoreFlusher;
78 private final Server server;
79
80 private HeapMemoryTunerChore heapMemTunerChore = null;
81 private final boolean tunerOn;
82 private final int defaultChorePeriod;
83 private final float heapOccupancyLowWatermark;
84
85 private long maxHeapSize = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
86
87 public static HeapMemoryManager create(Configuration conf, FlushRequester memStoreFlusher,
88 Server server) {
89 BlockCache blockCache = CacheConfig.instantiateBlockCache(conf);
90 if (blockCache instanceof ResizableBlockCache) {
91 return new HeapMemoryManager((ResizableBlockCache) blockCache, memStoreFlusher, server);
92 }
93 return null;
94 }
95
96 @VisibleForTesting
97 HeapMemoryManager(ResizableBlockCache blockCache, FlushRequester memStoreFlusher,
98 Server server) {
99 Configuration conf = server.getConfiguration();
100 this.blockCache = blockCache;
101 this.memStoreFlusher = memStoreFlusher;
102 this.server = server;
103 this.tunerOn = doInit(conf);
104 this.defaultChorePeriod = conf.getInt(HBASE_RS_HEAP_MEMORY_TUNER_PERIOD,
105 HBASE_RS_HEAP_MEMORY_TUNER_DEFAULT_PERIOD);
106 this.heapOccupancyLowWatermark = conf.getFloat(HConstants.HEAP_OCCUPANCY_LOW_WATERMARK_KEY,
107 HConstants.DEFAULT_HEAP_OCCUPANCY_LOW_WATERMARK);
108 }
109
110 private boolean doInit(Configuration conf) {
111 boolean tuningEnabled = true;
112 globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, false);
113 blockCachePercent = conf.getFloat(HFILE_BLOCK_CACHE_SIZE_KEY,
114 HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
115 HeapMemorySizeUtil.checkForClusterFreeMemoryLimit(conf);
116
117 globalMemStorePercentMinRange = conf.getFloat(MEMSTORE_SIZE_MIN_RANGE_KEY,
118 globalMemStorePercent);
119 globalMemStorePercentMaxRange = conf.getFloat(MEMSTORE_SIZE_MAX_RANGE_KEY,
120 globalMemStorePercent);
121 if (globalMemStorePercent < globalMemStorePercentMinRange) {
122 LOG.warn("Setting " + MEMSTORE_SIZE_MIN_RANGE_KEY + " to " + globalMemStorePercent
123 + ", same value as " + HeapMemorySizeUtil.MEMSTORE_SIZE_KEY
124 + " because supplied value greater than initial memstore size value.");
125 globalMemStorePercentMinRange = globalMemStorePercent;
126 conf.setFloat(MEMSTORE_SIZE_MIN_RANGE_KEY, globalMemStorePercentMinRange);
127 }
128 if (globalMemStorePercent > globalMemStorePercentMaxRange) {
129 LOG.warn("Setting " + MEMSTORE_SIZE_MAX_RANGE_KEY + " to " + globalMemStorePercent
130 + ", same value as " + HeapMemorySizeUtil.MEMSTORE_SIZE_KEY
131 + " because supplied value less than initial memstore size value.");
132 globalMemStorePercentMaxRange = globalMemStorePercent;
133 conf.setFloat(MEMSTORE_SIZE_MAX_RANGE_KEY, globalMemStorePercentMaxRange);
134 }
135 if (globalMemStorePercent == globalMemStorePercentMinRange
136 && globalMemStorePercent == globalMemStorePercentMaxRange) {
137 tuningEnabled = false;
138 }
139
140 blockCachePercentMinRange = conf.getFloat(BLOCK_CACHE_SIZE_MIN_RANGE_KEY, blockCachePercent);
141 blockCachePercentMaxRange = conf.getFloat(BLOCK_CACHE_SIZE_MAX_RANGE_KEY, blockCachePercent);
142 if (blockCachePercent < blockCachePercentMinRange) {
143 LOG.warn("Setting " + BLOCK_CACHE_SIZE_MIN_RANGE_KEY + " to " + blockCachePercent
144 + ", same value as " + HFILE_BLOCK_CACHE_SIZE_KEY
145 + " because supplied value greater than initial block cache size.");
146 blockCachePercentMinRange = blockCachePercent;
147 conf.setFloat(BLOCK_CACHE_SIZE_MIN_RANGE_KEY, blockCachePercentMinRange);
148 }
149 if (blockCachePercent > blockCachePercentMaxRange) {
150 LOG.warn("Setting " + BLOCK_CACHE_SIZE_MAX_RANGE_KEY + " to " + blockCachePercent
151 + ", same value as " + HFILE_BLOCK_CACHE_SIZE_KEY
152 + " because supplied value less than initial block cache size.");
153 blockCachePercentMaxRange = blockCachePercent;
154 conf.setFloat(BLOCK_CACHE_SIZE_MAX_RANGE_KEY, blockCachePercentMaxRange);
155 }
156 if (tuningEnabled && blockCachePercent == blockCachePercentMinRange
157 && blockCachePercent == blockCachePercentMaxRange) {
158 tuningEnabled = false;
159 }
160
161 int gml = (int) (globalMemStorePercentMaxRange * CONVERT_TO_PERCENTAGE);
162 this.l2BlockCachePercent = HeapMemorySizeUtil.getL2BlockCacheHeapPercent(conf);
163 int bcul = (int) ((blockCachePercentMinRange + l2BlockCachePercent) * CONVERT_TO_PERCENTAGE);
164 if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
165 throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds "
166 + "the threshold required for successful cluster operation. "
167 + "The combined value cannot exceed 0.8. Please check the settings for "
168 + MEMSTORE_SIZE_MAX_RANGE_KEY + " and " + BLOCK_CACHE_SIZE_MIN_RANGE_KEY
169 + " in your configuration. " + MEMSTORE_SIZE_MAX_RANGE_KEY + " is "
170 + globalMemStorePercentMaxRange + " and " + BLOCK_CACHE_SIZE_MIN_RANGE_KEY + " is "
171 + blockCachePercentMinRange);
172 }
173 gml = (int) (globalMemStorePercentMinRange * CONVERT_TO_PERCENTAGE);
174 bcul = (int) ((blockCachePercentMaxRange + l2BlockCachePercent) * CONVERT_TO_PERCENTAGE);
175 if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
176 throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds "
177 + "the threshold required for successful cluster operation. "
178 + "The combined value cannot exceed 0.8. Please check the settings for "
179 + MEMSTORE_SIZE_MIN_RANGE_KEY + " and " + BLOCK_CACHE_SIZE_MAX_RANGE_KEY
180 + " in your configuration. " + MEMSTORE_SIZE_MIN_RANGE_KEY + " is "
181 + globalMemStorePercentMinRange + " and " + BLOCK_CACHE_SIZE_MAX_RANGE_KEY + " is "
182 + blockCachePercentMaxRange);
183 }
184 return tuningEnabled;
185 }
186
187 public void start(ChoreService service) {
188 LOG.info("Starting HeapMemoryTuner chore.");
189 this.heapMemTunerChore = new HeapMemoryTunerChore();
190 service.scheduleChore(heapMemTunerChore);
191 if (tunerOn) {
192
193 memStoreFlusher.registerFlushRequestListener(heapMemTunerChore);
194 }
195 }
196
197 public void stop() {
198
199 LOG.info("Stoping HeapMemoryTuner chore.");
200 this.heapMemTunerChore.cancel(true);
201
202 }
203
204
205 boolean isTunerOn() {
206 return this.tunerOn;
207 }
208
209
210
211
212 public float getHeapOccupancyPercent() {
213 return this.heapOccupancyPercent;
214 }
215
216 private class HeapMemoryTunerChore extends ScheduledChore implements FlushRequestListener {
217 private HeapMemoryTuner heapMemTuner;
218 private AtomicLong blockedFlushCount = new AtomicLong();
219 private AtomicLong unblockedFlushCount = new AtomicLong();
220 private long evictCount = 0L;
221 private TunerContext tunerContext = new TunerContext();
222 private boolean alarming = false;
223
224 public HeapMemoryTunerChore() {
225 super(server.getServerName() + "-HeapMemoryTunerChore", server, defaultChorePeriod);
226 Class<? extends HeapMemoryTuner> tunerKlass = server.getConfiguration().getClass(
227 HBASE_RS_HEAP_MEMORY_TUNER_CLASS, DefaultHeapMemoryTuner.class, HeapMemoryTuner.class);
228 heapMemTuner = ReflectionUtils.newInstance(tunerKlass, server.getConfiguration());
229 }
230
231 @Override
232 protected void chore() {
233
234 MemoryUsage memUsage = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
235 heapOccupancyPercent = (float)memUsage.getUsed() / (float)memUsage.getCommitted();
236
237
238 if (heapOccupancyPercent >= heapOccupancyLowWatermark) {
239 if (!alarming) {
240 LOG.warn("heapOccupancyPercent " + heapOccupancyPercent +
241 " is above heap occupancy alarm watermark (" + heapOccupancyLowWatermark + ")");
242 alarming = true;
243 }
244
245 triggerNow();
246 try {
247
248
249 Thread.sleep(1000);
250 } catch (InterruptedException e) {
251
252 Thread.currentThread().interrupt();
253 }
254 } else {
255 if (alarming) {
256 LOG.info("heapOccupancyPercent " + heapOccupancyPercent +
257 " is now below the heap occupancy alarm watermark (" +
258 heapOccupancyLowWatermark + ")");
259 alarming = false;
260 }
261 }
262
263 if (tunerOn && !alarming) {
264 tune();
265 }
266 }
267
268 private void tune() {
269 long curEvictCount = blockCache.getStats().getEvictedCount();
270 tunerContext.setEvictCount(curEvictCount - evictCount);
271 evictCount = curEvictCount;
272 tunerContext.setBlockedFlushCount(blockedFlushCount.getAndSet(0));
273 tunerContext.setUnblockedFlushCount(unblockedFlushCount.getAndSet(0));
274 tunerContext.setCurBlockCacheSize(blockCachePercent);
275 tunerContext.setCurMemStoreSize(globalMemStorePercent);
276 TunerResult result = null;
277 try {
278 result = this.heapMemTuner.tune(tunerContext);
279 } catch (Throwable t) {
280 LOG.error("Exception thrown from the HeapMemoryTuner implementation", t);
281 }
282 if (result != null && result.needsTuning()) {
283 float memstoreSize = result.getMemstoreSize();
284 float blockCacheSize = result.getBlockCacheSize();
285 LOG.debug("From HeapMemoryTuner new memstoreSize: " + memstoreSize
286 + ". new blockCacheSize: " + blockCacheSize);
287 if (memstoreSize < globalMemStorePercentMinRange) {
288 LOG.info("New memstoreSize from HeapMemoryTuner " + memstoreSize + " is below min level "
289 + globalMemStorePercentMinRange + ". Resetting memstoreSize to min size");
290 memstoreSize = globalMemStorePercentMinRange;
291 } else if (memstoreSize > globalMemStorePercentMaxRange) {
292 LOG.info("New memstoreSize from HeapMemoryTuner " + memstoreSize + " is above max level "
293 + globalMemStorePercentMaxRange + ". Resetting memstoreSize to max size");
294 memstoreSize = globalMemStorePercentMaxRange;
295 }
296 if (blockCacheSize < blockCachePercentMinRange) {
297 LOG.info("New blockCacheSize from HeapMemoryTuner " + blockCacheSize
298 + " is below min level " + blockCachePercentMinRange
299 + ". Resetting blockCacheSize to min size");
300 blockCacheSize = blockCachePercentMinRange;
301 } else if (blockCacheSize > blockCachePercentMaxRange) {
302 LOG.info("New blockCacheSize from HeapMemoryTuner " + blockCacheSize
303 + " is above max level " + blockCachePercentMaxRange
304 + ". Resetting blockCacheSize to min size");
305 blockCacheSize = blockCachePercentMaxRange;
306 }
307 int gml = (int) (memstoreSize * CONVERT_TO_PERCENTAGE);
308 int bcul = (int) ((blockCacheSize + l2BlockCachePercent) * CONVERT_TO_PERCENTAGE);
309 if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
310 LOG.info("Current heap configuration from HeapMemoryTuner exceeds "
311 + "the threshold required for successful cluster operation. "
312 + "The combined value cannot exceed 0.8. " + HeapMemorySizeUtil.MEMSTORE_SIZE_KEY
313 + " is " + memstoreSize + " and " + HFILE_BLOCK_CACHE_SIZE_KEY + " is "
314 + blockCacheSize);
315
316 } else {
317 long newBlockCacheSize = (long) (maxHeapSize * blockCacheSize);
318 long newMemstoreSize = (long) (maxHeapSize * memstoreSize);
319 LOG.info("Setting block cache heap size to " + newBlockCacheSize
320 + " and memstore heap size to " + newMemstoreSize);
321 blockCachePercent = blockCacheSize;
322 blockCache.setMaxSize(newBlockCacheSize);
323 globalMemStorePercent = memstoreSize;
324 memStoreFlusher.setGlobalMemstoreLimit(newMemstoreSize);
325 }
326 }
327 }
328
329 @Override
330 public void flushRequested(FlushType type, Region region) {
331 switch (type) {
332 case ABOVE_HIGHER_MARK:
333 blockedFlushCount.incrementAndGet();
334 break;
335 case ABOVE_LOWER_MARK:
336 unblockedFlushCount.incrementAndGet();
337 break;
338 default:
339
340 break;
341 }
342 }
343 }
344
345
346
347
348
349
350 public static final class TunerContext {
351 private long blockedFlushCount;
352 private long unblockedFlushCount;
353 private long evictCount;
354 private float curMemStoreSize;
355 private float curBlockCacheSize;
356
357 public long getBlockedFlushCount() {
358 return blockedFlushCount;
359 }
360
361 public void setBlockedFlushCount(long blockedFlushCount) {
362 this.blockedFlushCount = blockedFlushCount;
363 }
364
365 public long getUnblockedFlushCount() {
366 return unblockedFlushCount;
367 }
368
369 public void setUnblockedFlushCount(long unblockedFlushCount) {
370 this.unblockedFlushCount = unblockedFlushCount;
371 }
372
373 public long getEvictCount() {
374 return evictCount;
375 }
376
377 public void setEvictCount(long evictCount) {
378 this.evictCount = evictCount;
379 }
380
381 public float getCurMemStoreSize() {
382 return curMemStoreSize;
383 }
384
385 public void setCurMemStoreSize(float curMemStoreSize) {
386 this.curMemStoreSize = curMemStoreSize;
387 }
388
389 public float getCurBlockCacheSize() {
390 return curBlockCacheSize;
391 }
392
393 public void setCurBlockCacheSize(float curBlockCacheSize) {
394 this.curBlockCacheSize = curBlockCacheSize;
395 }
396 }
397
398
399
400
401
402 public static final class TunerResult {
403 private float memstoreSize;
404 private float blockCacheSize;
405 private final boolean needsTuning;
406
407 public TunerResult(boolean needsTuning) {
408 this.needsTuning = needsTuning;
409 }
410
411 public float getMemstoreSize() {
412 return memstoreSize;
413 }
414
415 public void setMemstoreSize(float memstoreSize) {
416 this.memstoreSize = memstoreSize;
417 }
418
419 public float getBlockCacheSize() {
420 return blockCacheSize;
421 }
422
423 public void setBlockCacheSize(float blockCacheSize) {
424 this.blockCacheSize = blockCacheSize;
425 }
426
427 public boolean needsTuning() {
428 return needsTuning;
429 }
430 }
431 }