1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io.hfile;
20
21 import java.lang.ref.WeakReference;
22 import java.nio.ByteBuffer;
23 import java.util.EnumMap;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.PriorityQueue;
28 import java.util.SortedSet;
29 import java.util.TreeSet;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.ScheduledExecutorService;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicLong;
35 import java.util.concurrent.locks.ReentrantLock;
36
37 import com.google.common.base.Objects;
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.hbase.classification.InterfaceAudience;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.hbase.io.HeapSize;
43 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
44 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
45 import org.apache.hadoop.hbase.util.Bytes;
46 import org.apache.hadoop.hbase.util.ClassSize;
47 import org.apache.hadoop.hbase.util.HasThread;
48 import org.apache.hadoop.util.StringUtils;
49 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
50
51 import com.google.common.annotations.VisibleForTesting;
52 import com.google.common.util.concurrent.ThreadFactoryBuilder;
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98 @InterfaceAudience.Private
99 @JsonIgnoreProperties({"encodingCountsForTest"})
100 public class LruBlockCache implements ResizableBlockCache, HeapSize {
101
102 private static final Log LOG = LogFactory.getLog(LruBlockCache.class);
103
104
105
106
107
108 static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";
109
110
111
112
113 static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.acceptable.factor";
114
115
116
117
118 static final String LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.hard.capacity.limit.factor";
119 static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.single.percentage";
120 static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.multi.percentage";
121 static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.memory.percentage";
122
123
124
125
126
127
128 static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME = "hbase.lru.rs.inmemoryforcemode";
129
130
131
132
133 static final float DEFAULT_LOAD_FACTOR = 0.75f;
134 static final int DEFAULT_CONCURRENCY_LEVEL = 16;
135
136
137 static final float DEFAULT_MIN_FACTOR = 0.95f;
138 static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f;
139
140
141 static final float DEFAULT_SINGLE_FACTOR = 0.25f;
142 static final float DEFAULT_MULTI_FACTOR = 0.50f;
143 static final float DEFAULT_MEMORY_FACTOR = 0.25f;
144
145
146 static final float DEFAULT_HARD_CAPACITY_LIMIT_FACTOR = 1.2f;
147
148 static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false;
149
150
151 static final int statThreadPeriod = 60 * 5;
152 private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size";
153 private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
154
155
156 private final Map<BlockCacheKey,LruCachedBlock> map;
157
158
159 private final ReentrantLock evictionLock = new ReentrantLock(true);
160 private final long maxBlockSize;
161
162
163 private volatile boolean evictionInProgress = false;
164
165
166 private final EvictionThread evictionThread;
167
168
169 private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
170 new ThreadFactoryBuilder().setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build());
171
172
173 private final AtomicLong size;
174
175
176 private final AtomicLong elements;
177
178
179 private final AtomicLong count;
180
181
182 private float hardCapacityLimitFactor;
183
184
185 private final CacheStats stats;
186
187
188 private long maxSize;
189
190
191 private long blockSize;
192
193
194 private float acceptableFactor;
195
196
197 private float minFactor;
198
199
200 private float singleFactor;
201
202
203 private float multiFactor;
204
205
206 private float memoryFactor;
207
208
209 private long overhead;
210
211
212 private boolean forceInMemory;
213
214
215 private BlockCache victimHandler = null;
216
217
218
219
220
221
222
223
224
225
226 public LruBlockCache(long maxSize, long blockSize) {
227 this(maxSize, blockSize, true);
228 }
229
230
231
232
233 public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
234 this(maxSize, blockSize, evictionThread,
235 (int)Math.ceil(1.2*maxSize/blockSize),
236 DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
237 DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
238 DEFAULT_SINGLE_FACTOR,
239 DEFAULT_MULTI_FACTOR,
240 DEFAULT_MEMORY_FACTOR,
241 DEFAULT_HARD_CAPACITY_LIMIT_FACTOR,
242 false,
243 DEFAULT_MAX_BLOCK_SIZE
244 );
245 }
246
247 public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
248 this(maxSize, blockSize, evictionThread,
249 (int)Math.ceil(1.2*maxSize/blockSize),
250 DEFAULT_LOAD_FACTOR,
251 DEFAULT_CONCURRENCY_LEVEL,
252 conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
253 conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR),
254 conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR),
255 conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR),
256 conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR),
257 conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, DEFAULT_HARD_CAPACITY_LIMIT_FACTOR),
258 conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE),
259 conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE)
260 );
261 }
262
263 public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
264 this(maxSize, blockSize, true, conf);
265 }
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281 public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
282 int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
283 float minFactor, float acceptableFactor, float singleFactor,
284 float multiFactor, float memoryFactor, float hardLimitFactor,
285 boolean forceInMemory, long maxBlockSize) {
286 this.maxBlockSize = maxBlockSize;
287 if(singleFactor + multiFactor + memoryFactor != 1 ||
288 singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) {
289 throw new IllegalArgumentException("Single, multi, and memory factors " +
290 " should be non-negative and total 1.0");
291 }
292 if(minFactor >= acceptableFactor) {
293 throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
294 }
295 if(minFactor >= 1.0f || acceptableFactor >= 1.0f) {
296 throw new IllegalArgumentException("all factors must be < 1");
297 }
298 this.maxSize = maxSize;
299 this.blockSize = blockSize;
300 this.forceInMemory = forceInMemory;
301 map = new ConcurrentHashMap<BlockCacheKey,LruCachedBlock>(mapInitialSize,
302 mapLoadFactor, mapConcurrencyLevel);
303 this.minFactor = minFactor;
304 this.acceptableFactor = acceptableFactor;
305 this.singleFactor = singleFactor;
306 this.multiFactor = multiFactor;
307 this.memoryFactor = memoryFactor;
308 this.stats = new CacheStats(this.getClass().getSimpleName());
309 this.count = new AtomicLong(0);
310 this.elements = new AtomicLong(0);
311 this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
312 this.size = new AtomicLong(this.overhead);
313 this.hardCapacityLimitFactor = hardLimitFactor;
314 if(evictionThread) {
315 this.evictionThread = new EvictionThread(this);
316 this.evictionThread.start();
317 } else {
318 this.evictionThread = null;
319 }
320
321
322 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
323 statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
324 }
325
326 @Override
327 public void setMaxSize(long maxSize) {
328 this.maxSize = maxSize;
329 if(this.size.get() > acceptableSize() && !evictionInProgress) {
330 runEviction();
331 }
332 }
333
334
335
336
337
338
339
340
341
342
343
344
345
346 @Override
347 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
348 final boolean cacheDataInL1) {
349
350 if (buf.heapSize() > maxBlockSize) {
351
352
353
354 if (stats.failInsert() % 50 == 0) {
355 LOG.warn("Trying to cache too large a block "
356 + cacheKey.getHfileName() + " @ "
357 + cacheKey.getOffset()
358 + " is " + buf.heapSize()
359 + " which is larger than " + maxBlockSize);
360 }
361 return;
362 }
363
364 LruCachedBlock cb = map.get(cacheKey);
365 if (cb != null) {
366
367 if (compare(buf, cb.getBuffer()) != 0) {
368 throw new RuntimeException("Cached block contents differ, which should not have happened."
369 + "cacheKey:" + cacheKey);
370 }
371 String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey();
372 msg += ". This is harmless and can happen in rare cases (see HBASE-8547)";
373 LOG.debug(msg);
374 return;
375 }
376 long currentSize = size.get();
377 long currentAcceptableSize = acceptableSize();
378 long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize);
379 if (currentSize >= hardLimitSize) {
380 stats.failInsert();
381 if (LOG.isTraceEnabled()) {
382 LOG.trace("LruBlockCache current size " + StringUtils.byteDesc(currentSize)
383 + " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + " too many."
384 + " the hard limit size is " + StringUtils.byteDesc(hardLimitSize) + ", failed to put cacheKey:"
385 + cacheKey + " into LruBlockCache.");
386 }
387 if (!evictionInProgress) {
388 runEviction();
389 }
390 return;
391 }
392 cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
393 long newSize = updateSizeMetrics(cb, false);
394 map.put(cacheKey, cb);
395 long val = elements.incrementAndGet();
396 if (LOG.isTraceEnabled()) {
397 long size = map.size();
398 assertCounterSanity(size, val);
399 }
400 if (newSize > currentAcceptableSize && !evictionInProgress) {
401 runEviction();
402 }
403 }
404
405
406
407
408
409 private static void assertCounterSanity(long mapSize, long counterVal) {
410 if (counterVal < 0) {
411 LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal +
412 ", mapSize=" + mapSize);
413 return;
414 }
415 if (mapSize < Integer.MAX_VALUE) {
416 double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.);
417 if (pct_diff > 0.05) {
418 LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal +
419 ", mapSize=" + mapSize);
420 }
421 }
422 }
423
424 private int compare(Cacheable left, Cacheable right) {
425 ByteBuffer l = ByteBuffer.allocate(left.getSerializedLength());
426 left.serialize(l);
427 ByteBuffer r = ByteBuffer.allocate(right.getSerializedLength());
428 right.serialize(r);
429 return Bytes.compareTo(l.array(), l.arrayOffset(), l.limit(),
430 r.array(), r.arrayOffset(), r.limit());
431 }
432
433
434
435
436
437
438
439 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
440 cacheBlock(cacheKey, buf, false, false);
441 }
442
443
444
445
446
447
448
449
450
451 protected long updateSizeMetrics(LruCachedBlock cb, boolean evict) {
452 long heapsize = cb.heapSize();
453 if (evict) {
454 heapsize *= -1;
455 }
456 return size.addAndGet(heapsize);
457 }
458
459
460
461
462
463
464
465
466
467
468 @Override
469 public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
470 boolean updateCacheMetrics) {
471 LruCachedBlock cb = map.get(cacheKey);
472 if (cb == null) {
473 if (!repeat && updateCacheMetrics) stats.miss(caching, cacheKey.isPrimary());
474
475
476
477 if (victimHandler != null && !repeat) {
478 Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
479
480
481 if (result != null && caching) {
482 cacheBlock(cacheKey, result,
483 }
484 return result;
485 }
486 return null;
487 }
488 if (updateCacheMetrics) stats.hit(caching, cacheKey.isPrimary());
489 cb.access(count.incrementAndGet());
490 return cb.getBuffer();
491 }
492
493
494
495
496
497
498 public boolean containsBlock(BlockCacheKey cacheKey) {
499 return map.containsKey(cacheKey);
500 }
501
502 @Override
503 public boolean evictBlock(BlockCacheKey cacheKey) {
504 LruCachedBlock cb = map.get(cacheKey);
505 if (cb == null) return false;
506 evictBlock(cb, false);
507 return true;
508 }
509
510
511
512
513
514
515
516
517
518
519
520 @Override
521 public int evictBlocksByHfileName(String hfileName) {
522 int numEvicted = 0;
523 for (BlockCacheKey key : map.keySet()) {
524 if (key.getHfileName().equals(hfileName)) {
525 if (evictBlock(key))
526 ++numEvicted;
527 }
528 }
529 if (victimHandler != null) {
530 numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
531 }
532 return numEvicted;
533 }
534
535
536
537
538
539
540
541
542
543 protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) {
544 map.remove(block.getCacheKey());
545 updateSizeMetrics(block, true);
546 long val = elements.decrementAndGet();
547 if (LOG.isTraceEnabled()) {
548 long size = map.size();
549 assertCounterSanity(size, val);
550 }
551 stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary());
552 if (evictedByEvictionProcess && victimHandler != null) {
553 if (victimHandler instanceof BucketCache) {
554 boolean wait = getCurrentSize() < acceptableSize();
555 boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
556 ((BucketCache)victimHandler).cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
557 inMemory, wait);
558 } else {
559 victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer());
560 }
561 }
562 return block.heapSize();
563 }
564
565
566
567
568 private void runEviction() {
569 if(evictionThread == null) {
570 evict();
571 } else {
572 evictionThread.evict();
573 }
574 }
575
576
577
578
579 void evict() {
580
581
582 if(!evictionLock.tryLock()) return;
583
584 try {
585 evictionInProgress = true;
586 long currentSize = this.size.get();
587 long bytesToFree = currentSize - minSize();
588
589 if (LOG.isTraceEnabled()) {
590 LOG.trace("Block cache LRU eviction started; Attempting to free " +
591 StringUtils.byteDesc(bytesToFree) + " of total=" +
592 StringUtils.byteDesc(currentSize));
593 }
594
595 if(bytesToFree <= 0) return;
596
597
598 BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize,
599 singleSize());
600 BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize,
601 multiSize());
602 BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize,
603 memorySize());
604
605
606 for(LruCachedBlock cachedBlock : map.values()) {
607 switch(cachedBlock.getPriority()) {
608 case SINGLE: {
609 bucketSingle.add(cachedBlock);
610 break;
611 }
612 case MULTI: {
613 bucketMulti.add(cachedBlock);
614 break;
615 }
616 case MEMORY: {
617 bucketMemory.add(cachedBlock);
618 break;
619 }
620 }
621 }
622
623 long bytesFreed = 0;
624 if (forceInMemory || memoryFactor > 0.999f) {
625 long s = bucketSingle.totalSize();
626 long m = bucketMulti.totalSize();
627 if (bytesToFree > (s + m)) {
628
629
630 bytesFreed = bucketSingle.free(s);
631 bytesFreed += bucketMulti.free(m);
632 if (LOG.isTraceEnabled()) {
633 LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
634 " from single and multi buckets");
635 }
636 bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
637 if (LOG.isTraceEnabled()) {
638 LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
639 " total from all three buckets ");
640 }
641 } else {
642
643
644
645 long bytesRemain = s + m - bytesToFree;
646 if (3 * s <= bytesRemain) {
647
648
649 bytesFreed = bucketMulti.free(bytesToFree);
650 } else if (3 * m <= 2 * bytesRemain) {
651
652
653 bytesFreed = bucketSingle.free(bytesToFree);
654 } else {
655
656 bytesFreed = bucketSingle.free(s - bytesRemain / 3);
657 if (bytesFreed < bytesToFree) {
658 bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
659 }
660 }
661 }
662 } else {
663 PriorityQueue<BlockBucket> bucketQueue =
664 new PriorityQueue<BlockBucket>(3);
665
666 bucketQueue.add(bucketSingle);
667 bucketQueue.add(bucketMulti);
668 bucketQueue.add(bucketMemory);
669
670 int remainingBuckets = 3;
671
672 BlockBucket bucket;
673 while((bucket = bucketQueue.poll()) != null) {
674 long overflow = bucket.overflow();
675 if(overflow > 0) {
676 long bucketBytesToFree = Math.min(overflow,
677 (bytesToFree - bytesFreed) / remainingBuckets);
678 bytesFreed += bucket.free(bucketBytesToFree);
679 }
680 remainingBuckets--;
681 }
682 }
683
684 if (LOG.isTraceEnabled()) {
685 long single = bucketSingle.totalSize();
686 long multi = bucketMulti.totalSize();
687 long memory = bucketMemory.totalSize();
688 LOG.trace("Block cache LRU eviction completed; " +
689 "freed=" + StringUtils.byteDesc(bytesFreed) + ", " +
690 "total=" + StringUtils.byteDesc(this.size.get()) + ", " +
691 "single=" + StringUtils.byteDesc(single) + ", " +
692 "multi=" + StringUtils.byteDesc(multi) + ", " +
693 "memory=" + StringUtils.byteDesc(memory));
694 }
695 } finally {
696 stats.evict();
697 evictionInProgress = false;
698 evictionLock.unlock();
699 }
700 }
701
702 @Override
703 public String toString() {
704 return Objects.toStringHelper(this)
705 .add("blockCount", getBlockCount())
706 .add("currentSize", getCurrentSize())
707 .add("freeSize", getFreeSize())
708 .add("maxSize", getMaxSize())
709 .add("heapSize", heapSize())
710 .add("minSize", minSize())
711 .add("minFactor", minFactor)
712 .add("multiSize", multiSize())
713 .add("multiFactor", multiFactor)
714 .add("singleSize", singleSize())
715 .add("singleFactor", singleFactor)
716 .toString();
717 }
718
719
720
721
722
723
724
725 private class BlockBucket implements Comparable<BlockBucket> {
726
727 private final String name;
728 private LruCachedBlockQueue queue;
729 private long totalSize = 0;
730 private long bucketSize;
731
732 public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) {
733 this.name = name;
734 this.bucketSize = bucketSize;
735 queue = new LruCachedBlockQueue(bytesToFree, blockSize);
736 totalSize = 0;
737 }
738
739 public void add(LruCachedBlock block) {
740 totalSize += block.heapSize();
741 queue.add(block);
742 }
743
744 public long free(long toFree) {
745 if (LOG.isTraceEnabled()) {
746 LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this);
747 }
748 LruCachedBlock cb;
749 long freedBytes = 0;
750 while ((cb = queue.pollLast()) != null) {
751 freedBytes += evictBlock(cb, true);
752 if (freedBytes >= toFree) {
753 return freedBytes;
754 }
755 }
756 if (LOG.isTraceEnabled()) {
757 LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this);
758 }
759 return freedBytes;
760 }
761
762 public long overflow() {
763 return totalSize - bucketSize;
764 }
765
766 public long totalSize() {
767 return totalSize;
768 }
769
770 public int compareTo(BlockBucket that) {
771 return Long.compare(this.overflow(), that.overflow());
772 }
773
774 @Override
775 public boolean equals(Object that) {
776 if (that == null || !(that instanceof BlockBucket)){
777 return false;
778 }
779 return compareTo((BlockBucket)that) == 0;
780 }
781
782 @Override
783 public int hashCode() {
784 return Objects.hashCode(name, bucketSize, queue, totalSize);
785 }
786
787 @Override
788 public String toString() {
789 return Objects.toStringHelper(this)
790 .add("name", name)
791 .add("totalSize", StringUtils.byteDesc(totalSize))
792 .add("bucketSize", StringUtils.byteDesc(bucketSize))
793 .toString();
794 }
795 }
796
797
798
799
800
801 public long getMaxSize() {
802 return this.maxSize;
803 }
804
805 @Override
806 public long getCurrentSize() {
807 return this.size.get();
808 }
809
810 @Override
811 public long getFreeSize() {
812 return getMaxSize() - getCurrentSize();
813 }
814
815 @Override
816 public long size() {
817 return getMaxSize();
818 }
819
820 @Override
821 public long getBlockCount() {
822 return this.elements.get();
823 }
824
825 EvictionThread getEvictionThread() {
826 return this.evictionThread;
827 }
828
829
830
831
832
833
834
835 static class EvictionThread extends HasThread {
836 private WeakReference<LruBlockCache> cache;
837 private volatile boolean go = true;
838
839 private boolean enteringRun = false;
840
841 public EvictionThread(LruBlockCache cache) {
842 super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread");
843 setDaemon(true);
844 this.cache = new WeakReference<LruBlockCache>(cache);
845 }
846
847 @Override
848 public void run() {
849 enteringRun = true;
850 while (this.go) {
851 synchronized(this) {
852 try {
853 this.wait(1000 * 10
854 } catch(InterruptedException e) {
855 LOG.warn("Interrupted eviction thread ", e);
856 Thread.currentThread().interrupt();
857 }
858 }
859 LruBlockCache cache = this.cache.get();
860 if (cache == null) break;
861 cache.evict();
862 }
863 }
864
865 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
866 justification="This is what we want")
867 public void evict() {
868 synchronized(this) {
869 this.notifyAll();
870 }
871 }
872
873 synchronized void shutdown() {
874 this.go = false;
875 this.notifyAll();
876 }
877
878
879
880
881 boolean isEnteringRun() {
882 return this.enteringRun;
883 }
884 }
885
886
887
888
889 static class StatisticsThread extends Thread {
890 private final LruBlockCache lru;
891
892 public StatisticsThread(LruBlockCache lru) {
893 super("LruBlockCacheStats");
894 setDaemon(true);
895 this.lru = lru;
896 }
897
898 @Override
899 public void run() {
900 lru.logStats();
901 }
902 }
903
904 public void logStats() {
905
906 long totalSize = heapSize();
907 long freeSize = maxSize - totalSize;
908 LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
909 "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
910 "max=" + StringUtils.byteDesc(this.maxSize) + ", " +
911 "blockCount=" + getBlockCount() + ", " +
912 "accesses=" + stats.getRequestCount() + ", " +
913 "hits=" + stats.getHitCount() + ", " +
914 "hitRatio=" + (stats.getHitCount() == 0 ?
915 "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
916 "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
917 "cachingHits=" + stats.getHitCachingCount() + ", " +
918 "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ?
919 "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
920 "evictions=" + stats.getEvictionCount() + ", " +
921 "evicted=" + stats.getEvictedCount() + ", " +
922 "evictedPerRun=" + stats.evictedPerEviction());
923 }
924
925
926
927
928
929
930
931 public CacheStats getStats() {
932 return this.stats;
933 }
934
935 public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
936 (4 * Bytes.SIZEOF_LONG) + (9 * ClassSize.REFERENCE) +
937 (6 * Bytes.SIZEOF_FLOAT) + (2 * Bytes.SIZEOF_BOOLEAN)
938 + ClassSize.OBJECT);
939
940 @Override
941 public long heapSize() {
942 return getCurrentSize();
943 }
944
945 public static long calculateOverhead(long maxSize, long blockSize, int concurrency){
946
947 return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP +
948 ((long)Math.ceil(maxSize*1.2/blockSize)
949 * ClassSize.CONCURRENT_HASHMAP_ENTRY) +
950 ((long)concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
951 }
952
953 @Override
954 public Iterator<CachedBlock> iterator() {
955 final Iterator<LruCachedBlock> iterator = map.values().iterator();
956
957 return new Iterator<CachedBlock>() {
958 private final long now = System.nanoTime();
959
960 @Override
961 public boolean hasNext() {
962 return iterator.hasNext();
963 }
964
965 @Override
966 public CachedBlock next() {
967 final LruCachedBlock b = iterator.next();
968 return new CachedBlock() {
969 @Override
970 public String toString() {
971 return BlockCacheUtil.toString(this, now);
972 }
973
974 @Override
975 public BlockPriority getBlockPriority() {
976 return b.getPriority();
977 }
978
979 @Override
980 public BlockType getBlockType() {
981 return b.getBuffer().getBlockType();
982 }
983
984 @Override
985 public long getOffset() {
986 return b.getCacheKey().getOffset();
987 }
988
989 @Override
990 public long getSize() {
991 return b.getBuffer().heapSize();
992 }
993
994 @Override
995 public long getCachedTime() {
996 return b.getCachedTime();
997 }
998
999 @Override
1000 public String getFilename() {
1001 return b.getCacheKey().getHfileName();
1002 }
1003
1004 @Override
1005 public int compareTo(CachedBlock other) {
1006 int diff = this.getFilename().compareTo(other.getFilename());
1007 if (diff != 0) return diff;
1008 diff = Long.compare(this.getOffset(), other.getOffset());
1009 if (diff != 0) return diff;
1010 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
1011 throw new IllegalStateException("" + this.getCachedTime() + ", " +
1012 other.getCachedTime());
1013 }
1014 return Long.compare(other.getCachedTime(), this.getCachedTime());
1015 }
1016
1017 @Override
1018 public int hashCode() {
1019 return b.hashCode();
1020 }
1021
1022 @Override
1023 public boolean equals(Object obj) {
1024 if (obj instanceof CachedBlock) {
1025 CachedBlock cb = (CachedBlock)obj;
1026 return compareTo(cb) == 0;
1027 } else {
1028 return false;
1029 }
1030 }
1031 };
1032 }
1033
1034 @Override
1035 public void remove() {
1036 throw new UnsupportedOperationException();
1037 }
1038 };
1039 }
1040
1041
1042
1043 long acceptableSize() {
1044 return (long)Math.floor(this.maxSize * this.acceptableFactor);
1045 }
1046 private long minSize() {
1047 return (long)Math.floor(this.maxSize * this.minFactor);
1048 }
1049 private long singleSize() {
1050 return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor);
1051 }
1052 private long multiSize() {
1053 return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor);
1054 }
1055 private long memorySize() {
1056 return (long)Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
1057 }
1058
1059 public void shutdown() {
1060 if (victimHandler != null)
1061 victimHandler.shutdown();
1062 this.scheduleThreadPool.shutdown();
1063 for (int i = 0; i < 10; i++) {
1064 if (!this.scheduleThreadPool.isShutdown()) {
1065 try {
1066 Thread.sleep(10);
1067 } catch (InterruptedException e) {
1068 LOG.warn("Interrupted while sleeping");
1069 Thread.currentThread().interrupt();
1070 break;
1071 }
1072 }
1073 }
1074
1075 if (!this.scheduleThreadPool.isShutdown()) {
1076 List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
1077 LOG.debug("Still running " + runnables);
1078 }
1079 this.evictionThread.shutdown();
1080 }
1081
1082
1083 @VisibleForTesting
1084 public void clearCache() {
1085 this.map.clear();
1086 this.elements.set(0);
1087 }
1088
1089
1090
1091
1092
1093 @VisibleForTesting
1094 SortedSet<String> getCachedFileNamesForTest() {
1095 SortedSet<String> fileNames = new TreeSet<String>();
1096 for (BlockCacheKey cacheKey : map.keySet()) {
1097 fileNames.add(cacheKey.getHfileName());
1098 }
1099 return fileNames;
1100 }
1101
1102 @VisibleForTesting
1103 Map<BlockType, Integer> getBlockTypeCountsForTest() {
1104 Map<BlockType, Integer> counts =
1105 new EnumMap<BlockType, Integer>(BlockType.class);
1106 for (LruCachedBlock cb : map.values()) {
1107 BlockType blockType = ((Cacheable)cb.getBuffer()).getBlockType();
1108 Integer count = counts.get(blockType);
1109 counts.put(blockType, (count == null ? 0 : count) + 1);
1110 }
1111 return counts;
1112 }
1113
1114 @VisibleForTesting
1115 public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
1116 Map<DataBlockEncoding, Integer> counts =
1117 new EnumMap<DataBlockEncoding, Integer>(DataBlockEncoding.class);
1118 for (LruCachedBlock block : map.values()) {
1119 DataBlockEncoding encoding =
1120 ((HFileBlock) block.getBuffer()).getDataBlockEncoding();
1121 Integer count = counts.get(encoding);
1122 counts.put(encoding, (count == null ? 0 : count) + 1);
1123 }
1124 return counts;
1125 }
1126
1127 public void setVictimCache(BlockCache handler) {
1128 assert victimHandler == null;
1129 victimHandler = handler;
1130 }
1131
1132 @VisibleForTesting
1133 Map<BlockCacheKey, LruCachedBlock> getMapForTests() {
1134 return map;
1135 }
1136
1137 BlockCache getVictimHandler() {
1138 return this.victimHandler;
1139 }
1140
1141 @Override
1142 public BlockCache[] getBlockCaches() {
1143 return null;
1144 }
1145 }