1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io.hfile;
20
21 import java.io.IOException;
22 import java.lang.ref.WeakReference;
23 import java.nio.ByteBuffer;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.EnumMap;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.PriorityQueue;
31 import java.util.SortedSet;
32 import java.util.TreeSet;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicLong;
38 import java.util.concurrent.locks.ReentrantLock;
39
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42 import org.apache.hadoop.classification.InterfaceAudience;
43 import org.apache.hadoop.conf.Configuration;
44 import org.apache.hadoop.fs.FileSystem;
45 import org.apache.hadoop.fs.Path;
46 import org.apache.hadoop.hbase.io.HeapSize;
47 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
48 import org.apache.hadoop.hbase.io.hfile.CachedBlock.BlockPriority;
49 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
50 import org.apache.hadoop.hbase.util.Bytes;
51 import org.apache.hadoop.hbase.util.ClassSize;
52 import org.apache.hadoop.hbase.util.FSUtils;
53 import org.apache.hadoop.hbase.util.HasThread;
54 import org.apache.hadoop.hbase.util.Threads;
55 import org.apache.hadoop.util.StringUtils;
56
57 import com.google.common.util.concurrent.ThreadFactoryBuilder;
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97 @InterfaceAudience.Private
98 public class LruBlockCache implements BlockCache, HeapSize {
99
100 static final Log LOG = LogFactory.getLog(LruBlockCache.class);
101
102 static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";
103 static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.acceptable.factor";
104
105
106
107
108 static final float DEFAULT_LOAD_FACTOR = 0.75f;
109 static final int DEFAULT_CONCURRENCY_LEVEL = 16;
110
111
112 static final float DEFAULT_MIN_FACTOR = 0.95f;
113 static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f;
114
115
116 static final float DEFAULT_SINGLE_FACTOR = 0.25f;
117 static final float DEFAULT_MULTI_FACTOR = 0.50f;
118 static final float DEFAULT_MEMORY_FACTOR = 0.25f;
119
120
121 static final int statThreadPeriod = 60 * 5;
122
123
124 private final ConcurrentHashMap<BlockCacheKey,CachedBlock> map;
125
126
127 private final ReentrantLock evictionLock = new ReentrantLock(true);
128
129
130 private volatile boolean evictionInProgress = false;
131
132
133 private final EvictionThread evictionThread;
134
135
136 private final ScheduledExecutorService scheduleThreadPool =
137 Executors.newScheduledThreadPool(1,
138 new ThreadFactoryBuilder()
139 .setNameFormat("LRU Statistics #%d")
140 .setDaemon(true)
141 .build());
142
143
144 private final AtomicLong size;
145
146
147 private final AtomicLong elements;
148
149
150 private final AtomicLong count;
151
152
153 private final CacheStats stats;
154
155
156 private long maxSize;
157
158
159 private long blockSize;
160
161
162 private float acceptableFactor;
163
164
165 private float minFactor;
166
167
168 private float singleFactor;
169
170
171 private float multiFactor;
172
173
174 private float memoryFactor;
175
176
177 private long overhead;
178
179
180 private BucketCache victimHandler = null;
181
182
183
184
185
186
187
188
189
190
191 public LruBlockCache(long maxSize, long blockSize) {
192 this(maxSize, blockSize, true);
193 }
194
195
196
197
198 public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
199 this(maxSize, blockSize, evictionThread,
200 (int)Math.ceil(1.2*maxSize/blockSize),
201 DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
202 DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
203 DEFAULT_SINGLE_FACTOR, DEFAULT_MULTI_FACTOR,
204 DEFAULT_MEMORY_FACTOR);
205 }
206
207 public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
208 this(maxSize, blockSize, evictionThread,
209 (int)Math.ceil(1.2*maxSize/blockSize),
210 DEFAULT_LOAD_FACTOR,
211 DEFAULT_CONCURRENCY_LEVEL,
212 conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
213 conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR),
214 DEFAULT_SINGLE_FACTOR,
215 DEFAULT_MULTI_FACTOR,
216 DEFAULT_MEMORY_FACTOR);
217 }
218
219 public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
220 this(maxSize, blockSize, true, conf);
221 }
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237 public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
238 int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
239 float minFactor, float acceptableFactor,
240 float singleFactor, float multiFactor, float memoryFactor) {
241 if(singleFactor + multiFactor + memoryFactor != 1) {
242 throw new IllegalArgumentException("Single, multi, and memory factors " +
243 " should total 1.0");
244 }
245 if(minFactor >= acceptableFactor) {
246 throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
247 }
248 if(minFactor >= 1.0f || acceptableFactor >= 1.0f) {
249 throw new IllegalArgumentException("all factors must be < 1");
250 }
251 this.maxSize = maxSize;
252 this.blockSize = blockSize;
253 map = new ConcurrentHashMap<BlockCacheKey,CachedBlock>(mapInitialSize,
254 mapLoadFactor, mapConcurrencyLevel);
255 this.minFactor = minFactor;
256 this.acceptableFactor = acceptableFactor;
257 this.singleFactor = singleFactor;
258 this.multiFactor = multiFactor;
259 this.memoryFactor = memoryFactor;
260 this.stats = new CacheStats();
261 this.count = new AtomicLong(0);
262 this.elements = new AtomicLong(0);
263 this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
264 this.size = new AtomicLong(this.overhead);
265 if(evictionThread) {
266 this.evictionThread = new EvictionThread(this);
267 this.evictionThread.start();
268 } else {
269 this.evictionThread = null;
270 }
271 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
272 statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
273 }
274
275 public void setMaxSize(long maxSize) {
276 this.maxSize = maxSize;
277 if(this.size.get() > acceptableSize() && !evictionInProgress) {
278 runEviction();
279 }
280 }
281
282
283
284
285
286
287
288
289
290
291
292
293 @Override
294 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
295 CachedBlock cb = map.get(cacheKey);
296 if(cb != null) {
297
298 if (compare(buf, cb.getBuffer()) != 0) {
299 throw new RuntimeException("Cached block contents differ, which should not have happened."
300 + "cacheKey:" + cacheKey);
301 }
302 String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey();
303 msg += ". This is harmless and can happen in rare cases (see HBASE-8547)";
304 LOG.warn(msg);
305 assert false : msg;
306 return;
307 }
308 cb = new CachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
309 long newSize = updateSizeMetrics(cb, false);
310 map.put(cacheKey, cb);
311 elements.incrementAndGet();
312 if(newSize > acceptableSize() && !evictionInProgress) {
313 runEviction();
314 }
315 }
316
317 private int compare(Cacheable left, Cacheable right) {
318 ByteBuffer l = ByteBuffer.allocate(left.getSerializedLength());
319 left.serialize(l);
320 ByteBuffer r = ByteBuffer.allocate(right.getSerializedLength());
321 right.serialize(r);
322 return Bytes.compareTo(l.array(), l.arrayOffset(), l.limit(),
323 r.array(), r.arrayOffset(), r.limit());
324 }
325
326
327
328
329
330
331
332
333
334
335
336 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
337 cacheBlock(cacheKey, buf, false);
338 }
339
340
341
342
343
344
345
346
347
348 protected long updateSizeMetrics(CachedBlock cb, boolean evict) {
349 long heapsize = cb.heapSize();
350 if (evict) {
351 heapsize *= -1;
352 }
353 return size.addAndGet(heapsize);
354 }
355
356
357
358
359
360
361
362
363
364
365 @Override
366 public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat) {
367 CachedBlock cb = map.get(cacheKey);
368 if(cb == null) {
369 if (!repeat) stats.miss(caching);
370 if (victimHandler != null)
371 return victimHandler.getBlock(cacheKey, caching, repeat);
372 return null;
373 }
374 stats.hit(caching);
375 cb.access(count.incrementAndGet());
376 return cb.getBuffer();
377 }
378
379
380
381
382
383
384 public boolean containsBlock(BlockCacheKey cacheKey) {
385 return map.containsKey(cacheKey);
386 }
387
388 @Override
389 public boolean evictBlock(BlockCacheKey cacheKey) {
390 CachedBlock cb = map.get(cacheKey);
391 if (cb == null) return false;
392 evictBlock(cb, false);
393 return true;
394 }
395
396
397
398
399
400
401
402
403
404
405
406 @Override
407 public int evictBlocksByHfileName(String hfileName) {
408 int numEvicted = 0;
409 for (BlockCacheKey key : map.keySet()) {
410 if (key.getHfileName().equals(hfileName)) {
411 if (evictBlock(key))
412 ++numEvicted;
413 }
414 }
415 if (victimHandler != null) {
416 numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
417 }
418 return numEvicted;
419 }
420
421
422
423
424
425
426
427
428
429 protected long evictBlock(CachedBlock block, boolean evictedByEvictionProcess) {
430 map.remove(block.getCacheKey());
431 updateSizeMetrics(block, true);
432 elements.decrementAndGet();
433 stats.evicted();
434 if (evictedByEvictionProcess && victimHandler != null) {
435 boolean wait = getCurrentSize() < acceptableSize();
436 boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
437 victimHandler.cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
438 inMemory, wait);
439 }
440 return block.heapSize();
441 }
442
443
444
445
446 private void runEviction() {
447 if(evictionThread == null) {
448 evict();
449 } else {
450 evictionThread.evict();
451 }
452 }
453
454
455
456
457 void evict() {
458
459
460 if(!evictionLock.tryLock()) return;
461
462 try {
463 evictionInProgress = true;
464 long currentSize = this.size.get();
465 long bytesToFree = currentSize - minSize();
466
467 if (LOG.isDebugEnabled()) {
468 LOG.debug("Block cache LRU eviction started; Attempting to free " +
469 StringUtils.byteDesc(bytesToFree) + " of total=" +
470 StringUtils.byteDesc(currentSize));
471 }
472
473 if(bytesToFree <= 0) return;
474
475
476 BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize,
477 singleSize());
478 BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize,
479 multiSize());
480 BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize,
481 memorySize());
482
483
484 for(CachedBlock cachedBlock : map.values()) {
485 switch(cachedBlock.getPriority()) {
486 case SINGLE: {
487 bucketSingle.add(cachedBlock);
488 break;
489 }
490 case MULTI: {
491 bucketMulti.add(cachedBlock);
492 break;
493 }
494 case MEMORY: {
495 bucketMemory.add(cachedBlock);
496 break;
497 }
498 }
499 }
500
501 PriorityQueue<BlockBucket> bucketQueue =
502 new PriorityQueue<BlockBucket>(3);
503
504 bucketQueue.add(bucketSingle);
505 bucketQueue.add(bucketMulti);
506 bucketQueue.add(bucketMemory);
507
508 int remainingBuckets = 3;
509 long bytesFreed = 0;
510
511 BlockBucket bucket;
512 while((bucket = bucketQueue.poll()) != null) {
513 long overflow = bucket.overflow();
514 if(overflow > 0) {
515 long bucketBytesToFree = Math.min(overflow,
516 (bytesToFree - bytesFreed) / remainingBuckets);
517 bytesFreed += bucket.free(bucketBytesToFree);
518 }
519 remainingBuckets--;
520 }
521
522 if (LOG.isDebugEnabled()) {
523 long single = bucketSingle.totalSize();
524 long multi = bucketMulti.totalSize();
525 long memory = bucketMemory.totalSize();
526 LOG.debug("Block cache LRU eviction completed; " +
527 "freed=" + StringUtils.byteDesc(bytesFreed) + ", " +
528 "total=" + StringUtils.byteDesc(this.size.get()) + ", " +
529 "single=" + StringUtils.byteDesc(single) + ", " +
530 "multi=" + StringUtils.byteDesc(multi) + ", " +
531 "memory=" + StringUtils.byteDesc(memory));
532 }
533 } finally {
534 stats.evict();
535 evictionInProgress = false;
536 evictionLock.unlock();
537 }
538 }
539
540
541
542
543
544
545
546 private class BlockBucket implements Comparable<BlockBucket> {
547
548 private CachedBlockQueue queue;
549 private long totalSize = 0;
550 private long bucketSize;
551
552 public BlockBucket(long bytesToFree, long blockSize, long bucketSize) {
553 this.bucketSize = bucketSize;
554 queue = new CachedBlockQueue(bytesToFree, blockSize);
555 totalSize = 0;
556 }
557
558 public void add(CachedBlock block) {
559 totalSize += block.heapSize();
560 queue.add(block);
561 }
562
563 public long free(long toFree) {
564 CachedBlock cb;
565 long freedBytes = 0;
566 while ((cb = queue.pollLast()) != null) {
567 freedBytes += evictBlock(cb, true);
568 if (freedBytes >= toFree) {
569 return freedBytes;
570 }
571 }
572 return freedBytes;
573 }
574
575 public long overflow() {
576 return totalSize - bucketSize;
577 }
578
579 public long totalSize() {
580 return totalSize;
581 }
582
583 public int compareTo(BlockBucket that) {
584 if(this.overflow() == that.overflow()) return 0;
585 return this.overflow() > that.overflow() ? 1 : -1;
586 }
587
588 @Override
589 public boolean equals(Object that) {
590 if (that == null || !(that instanceof BlockBucket)){
591 return false;
592 }
593
594 return compareTo(( BlockBucket)that) == 0;
595 }
596
597 }
598
599
600
601
602
603 public long getMaxSize() {
604 return this.maxSize;
605 }
606
607
608
609
610
611 public long getCurrentSize() {
612 return this.size.get();
613 }
614
615
616
617
618
619 public long getFreeSize() {
620 return getMaxSize() - getCurrentSize();
621 }
622
623
624
625
626
627 public long size() {
628 return this.elements.get();
629 }
630
631 @Override
632 public long getBlockCount() {
633 return this.elements.get();
634 }
635
636
637
638
639 public long getEvictionCount() {
640 return this.stats.getEvictionCount();
641 }
642
643
644
645
646
647 public long getEvictedCount() {
648 return this.stats.getEvictedCount();
649 }
650
651 EvictionThread getEvictionThread() {
652 return this.evictionThread;
653 }
654
655
656
657
658
659
660
661 static class EvictionThread extends HasThread {
662 private WeakReference<LruBlockCache> cache;
663 private boolean go = true;
664
665 private boolean enteringRun = false;
666
667 public EvictionThread(LruBlockCache cache) {
668 super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread");
669 setDaemon(true);
670 this.cache = new WeakReference<LruBlockCache>(cache);
671 }
672
673 @Override
674 public void run() {
675 enteringRun = true;
676 while (this.go) {
677 synchronized(this) {
678 try {
679 this.wait();
680 } catch(InterruptedException e) {}
681 }
682 LruBlockCache cache = this.cache.get();
683 if(cache == null) break;
684 cache.evict();
685 }
686 }
687
688 public void evict() {
689 synchronized(this) {
690 this.notifyAll();
691 }
692 }
693
694 synchronized void shutdown() {
695 this.go = false;
696 this.notifyAll();
697 }
698
699
700
701
702 boolean isEnteringRun() {
703 return this.enteringRun;
704 }
705 }
706
707
708
709
710 static class StatisticsThread extends Thread {
711 LruBlockCache lru;
712
713 public StatisticsThread(LruBlockCache lru) {
714 super("LruBlockCache.StatisticsThread");
715 setDaemon(true);
716 this.lru = lru;
717 }
718 @Override
719 public void run() {
720 lru.logStats();
721 }
722 }
723
724 public void logStats() {
725 if (!LOG.isDebugEnabled()) return;
726
727 long totalSize = heapSize();
728 long freeSize = maxSize - totalSize;
729 LruBlockCache.LOG.debug("Stats: " +
730 "total=" + StringUtils.byteDesc(totalSize) + ", " +
731 "free=" + StringUtils.byteDesc(freeSize) + ", " +
732 "max=" + StringUtils.byteDesc(this.maxSize) + ", " +
733 "blocks=" + size() +", " +
734 "accesses=" + stats.getRequestCount() + ", " +
735 "hits=" + stats.getHitCount() + ", " +
736 "hitRatio=" +
737 (stats.getHitCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
738 "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
739 "cachingHits=" + stats.getHitCachingCount() + ", " +
740 "cachingHitsRatio=" +
741 (stats.getHitCachingCount() == 0 ? "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
742 "evictions=" + stats.getEvictionCount() + ", " +
743 "evicted=" + stats.getEvictedCount() + ", " +
744 "evictedPerRun=" + stats.evictedPerEviction());
745 }
746
747
748
749
750
751
752
753 public CacheStats getStats() {
754 return this.stats;
755 }
756
757 public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
758 (3 * Bytes.SIZEOF_LONG) + (9 * ClassSize.REFERENCE) +
759 (5 * Bytes.SIZEOF_FLOAT) + Bytes.SIZEOF_BOOLEAN
760 + ClassSize.OBJECT);
761
762
763 public long heapSize() {
764 return getCurrentSize();
765 }
766
767 public static long calculateOverhead(long maxSize, long blockSize, int concurrency){
768
769 return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP +
770 ((long)Math.ceil(maxSize*1.2/blockSize)
771 * ClassSize.CONCURRENT_HASHMAP_ENTRY) +
772 ((long)concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
773 }
774
775 @Override
776 public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries(Configuration conf) throws IOException {
777
778 Map<String, Path> sfMap = FSUtils.getTableStoreFilePathMap(
779 FileSystem.get(conf),
780 FSUtils.getRootDir(conf));
781
782
783
784 Map<BlockCacheColumnFamilySummary, BlockCacheColumnFamilySummary> bcs =
785 new HashMap<BlockCacheColumnFamilySummary, BlockCacheColumnFamilySummary>();
786
787 for (CachedBlock cb : map.values()) {
788 String sf = cb.getCacheKey().getHfileName();
789 Path path = sfMap.get(sf);
790 if ( path != null) {
791 BlockCacheColumnFamilySummary lookup =
792 BlockCacheColumnFamilySummary.createFromStoreFilePath(path);
793 BlockCacheColumnFamilySummary bcse = bcs.get(lookup);
794 if (bcse == null) {
795 bcse = BlockCacheColumnFamilySummary.create(lookup);
796 bcs.put(lookup,bcse);
797 }
798 bcse.incrementBlocks();
799 bcse.incrementHeapSize(cb.heapSize());
800 }
801 }
802 List<BlockCacheColumnFamilySummary> list =
803 new ArrayList<BlockCacheColumnFamilySummary>(bcs.values());
804 Collections.sort( list );
805 return list;
806 }
807
808
809
810 private long acceptableSize() {
811 return (long)Math.floor(this.maxSize * this.acceptableFactor);
812 }
813 private long minSize() {
814 return (long)Math.floor(this.maxSize * this.minFactor);
815 }
816 private long singleSize() {
817 return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor);
818 }
819 private long multiSize() {
820 return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor);
821 }
822 private long memorySize() {
823 return (long)Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
824 }
825
826 public void shutdown() {
827 if (victimHandler != null)
828 victimHandler.shutdown();
829 this.scheduleThreadPool.shutdown();
830 for (int i = 0; i < 10; i++) {
831 if (!this.scheduleThreadPool.isShutdown()) Threads.sleep(10);
832 }
833 if (!this.scheduleThreadPool.isShutdown()) {
834 List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
835 LOG.debug("Still running " + runnables);
836 }
837 this.evictionThread.shutdown();
838 }
839
840
841 public void clearCache() {
842 map.clear();
843 }
844
845
846
847
848
849 SortedSet<String> getCachedFileNamesForTest() {
850 SortedSet<String> fileNames = new TreeSet<String>();
851 for (BlockCacheKey cacheKey : map.keySet()) {
852 fileNames.add(cacheKey.getHfileName());
853 }
854 return fileNames;
855 }
856
857 Map<BlockType, Integer> getBlockTypeCountsForTest() {
858 Map<BlockType, Integer> counts =
859 new EnumMap<BlockType, Integer>(BlockType.class);
860 for (CachedBlock cb : map.values()) {
861 BlockType blockType = ((HFileBlock) cb.getBuffer()).getBlockType();
862 Integer count = counts.get(blockType);
863 counts.put(blockType, (count == null ? 0 : count) + 1);
864 }
865 return counts;
866 }
867
868 public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
869 Map<DataBlockEncoding, Integer> counts =
870 new EnumMap<DataBlockEncoding, Integer>(DataBlockEncoding.class);
871 for (BlockCacheKey cacheKey : map.keySet()) {
872 DataBlockEncoding encoding = cacheKey.getDataBlockEncoding();
873 Integer count = counts.get(encoding);
874 counts.put(encoding, (count == null ? 0 : count) + 1);
875 }
876 return counts;
877 }
878
879 public void setVictimCache(BucketCache handler) {
880 assert victimHandler == null;
881 victimHandler = handler;
882 }
883
884 }