1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.io.IOException;
22  import java.lang.ref.WeakReference;
23  import java.nio.ByteBuffer;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.EnumMap;
27  import java.util.HashMap;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.PriorityQueue;
31  import java.util.SortedSet;
32  import java.util.TreeSet;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.Executors;
35  import java.util.concurrent.ScheduledExecutorService;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.atomic.AtomicLong;
38  import java.util.concurrent.locks.ReentrantLock;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.classification.InterfaceAudience;
43  import org.apache.hadoop.conf.Configuration;
44  import org.apache.hadoop.fs.FileSystem;
45  import org.apache.hadoop.fs.Path;
46  import org.apache.hadoop.hbase.io.HeapSize;
47  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
48  import org.apache.hadoop.hbase.io.hfile.CachedBlock.BlockPriority;
49  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
50  import org.apache.hadoop.hbase.util.Bytes;
51  import org.apache.hadoop.hbase.util.ClassSize;
52  import org.apache.hadoop.hbase.util.FSUtils;
53  import org.apache.hadoop.hbase.util.HasThread;
54  import org.apache.hadoop.hbase.util.Threads;
55  import org.apache.hadoop.util.StringUtils;
56  
57  import com.google.common.util.concurrent.ThreadFactoryBuilder;
58  
59  /**
60   * A block cache implementation that is memory-aware using {@link HeapSize},
61   * memory-bound using an LRU eviction algorithm, and concurrent: backed by a
62   * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving
63   * constant-time {@link #cacheBlock} and {@link #getBlock} operations.<p>
64   *
65   * Contains three levels of block priority to allow for
66   * scan-resistance and in-memory families.  A block is added with an inMemory
67   * flag if necessary, otherwise a block becomes a single access priority.  Once
68   * a blocked is accessed again, it changes to multiple access.  This is used
69   * to prevent scans from thrashing the cache, adding a least-frequently-used
70   * element to the eviction algorithm.<p>
71   *
72   * Each priority is given its own chunk of the total cache to ensure
73   * fairness during eviction.  Each priority will retain close to its maximum
74   * size, however, if any priority is not using its entire chunk the others
75   * are able to grow beyond their chunk size.<p>
76   *
77   * Instantiated at a minimum with the total size and average block size.
78   * All sizes are in bytes.  The block size is not especially important as this
79   * cache is fully dynamic in its sizing of blocks.  It is only used for
80   * pre-allocating data structures and in initial heap estimation of the map.<p>
81   *
82   * The detailed constructor defines the sizes for the three priorities (they
83   * should total to the maximum size defined).  It also sets the levels that
84   * trigger and control the eviction thread.<p>
85   *
86   * The acceptable size is the cache size level which triggers the eviction
87   * process to start.  It evicts enough blocks to get the size below the
88   * minimum size specified.<p>
89   *
90   * Eviction happens in a separate thread and involves a single full-scan
91   * of the map.  It determines how many bytes must be freed to reach the minimum
92   * size, and then while scanning determines the fewest least-recently-used
93   * blocks necessary from each of the three priorities (would be 3 times bytes
94   * to free).  It then uses the priority chunk sizes to evict fairly according
95   * to the relative sizes and usage.
96   */
97  @InterfaceAudience.Private
98  public class LruBlockCache implements BlockCache, HeapSize {
99  
100   static final Log LOG = LogFactory.getLog(LruBlockCache.class);
101 
102   static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";
103   static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.acceptable.factor";
104 
105   /** Default Configuration Parameters*/
106 
107   /** Backing Concurrent Map Configuration */
108   static final float DEFAULT_LOAD_FACTOR = 0.75f;
109   static final int DEFAULT_CONCURRENCY_LEVEL = 16;
110 
111   /** Eviction thresholds */
112   static final float DEFAULT_MIN_FACTOR = 0.95f;
113   static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f;
114 
115   /** Priority buckets */
116   static final float DEFAULT_SINGLE_FACTOR = 0.25f;
117   static final float DEFAULT_MULTI_FACTOR = 0.50f;
118   static final float DEFAULT_MEMORY_FACTOR = 0.25f;
119 
120   /** Statistics thread */
121   static final int statThreadPeriod = 60 * 5;
122 
123   /** Concurrent map (the cache) */
124   private final ConcurrentHashMap<BlockCacheKey,CachedBlock> map;
125 
126   /** Eviction lock (locked when eviction in process) */
127   private final ReentrantLock evictionLock = new ReentrantLock(true);
128 
129   /** Volatile boolean to track if we are in an eviction process or not */
130   private volatile boolean evictionInProgress = false;
131 
132   /** Eviction thread */
133   private final EvictionThread evictionThread;
134 
135   /** Statistics thread schedule pool (for heavy debugging, could remove) */
136   private final ScheduledExecutorService scheduleThreadPool =
137     Executors.newScheduledThreadPool(1,
138       new ThreadFactoryBuilder()
139         .setNameFormat("LRU Statistics #%d")
140         .setDaemon(true)
141         .build());
142 
143   /** Current size of cache */
144   private final AtomicLong size;
145 
146   /** Current number of cached elements */
147   private final AtomicLong elements;
148 
149   /** Cache access count (sequential ID) */
150   private final AtomicLong count;
151 
152   /** Cache statistics */
153   private final CacheStats stats;
154 
155   /** Maximum allowable size of cache (block put if size > max, evict) */
156   private long maxSize;
157 
158   /** Approximate block size */
159   private long blockSize;
160 
161   /** Acceptable size of cache (no evictions if size < acceptable) */
162   private float acceptableFactor;
163 
164   /** Minimum threshold of cache (when evicting, evict until size < min) */
165   private float minFactor;
166 
167   /** Single access bucket size */
168   private float singleFactor;
169 
170   /** Multiple access bucket size */
171   private float multiFactor;
172 
173   /** In-memory bucket size */
174   private float memoryFactor;
175 
176   /** Overhead of the structure itself */
177   private long overhead;
178 
179   /** Where to send victims (blocks evicted from the cache) */
180   private BucketCache victimHandler = null;
181 
182   /**
183    * Default constructor.  Specify maximum size and expected average block
184    * size (approximation is fine).
185    *
186    * <p>All other factors will be calculated based on defaults specified in
187    * this class.
188    * @param maxSize maximum size of cache, in bytes
189    * @param blockSize approximate size of each block, in bytes
190    */
191   public LruBlockCache(long maxSize, long blockSize) {
192     this(maxSize, blockSize, true);
193   }
194 
195   /**
196    * Constructor used for testing.  Allows disabling of the eviction thread.
197    */
198   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
199     this(maxSize, blockSize, evictionThread,
200         (int)Math.ceil(1.2*maxSize/blockSize),
201         DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
202         DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
203         DEFAULT_SINGLE_FACTOR, DEFAULT_MULTI_FACTOR,
204         DEFAULT_MEMORY_FACTOR);
205   }
206 
207   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
208     this(maxSize, blockSize, evictionThread,
209         (int)Math.ceil(1.2*maxSize/blockSize),
210         DEFAULT_LOAD_FACTOR,
211         DEFAULT_CONCURRENCY_LEVEL,
212         conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
213         conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR),
214         DEFAULT_SINGLE_FACTOR,
215         DEFAULT_MULTI_FACTOR,
216         DEFAULT_MEMORY_FACTOR);
217   }
218 
219   public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
220     this(maxSize, blockSize, true, conf);
221   }
222 
223   /**
224    * Configurable constructor.  Use this constructor if not using defaults.
225    * @param maxSize maximum size of this cache, in bytes
226    * @param blockSize expected average size of blocks, in bytes
227    * @param evictionThread whether to run evictions in a bg thread or not
228    * @param mapInitialSize initial size of backing ConcurrentHashMap
229    * @param mapLoadFactor initial load factor of backing ConcurrentHashMap
230    * @param mapConcurrencyLevel initial concurrency factor for backing CHM
231    * @param minFactor percentage of total size that eviction will evict until
232    * @param acceptableFactor percentage of total size that triggers eviction
233    * @param singleFactor percentage of total size for single-access blocks
234    * @param multiFactor percentage of total size for multiple-access blocks
235    * @param memoryFactor percentage of total size for in-memory blocks
236    */
237   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
238       int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
239       float minFactor, float acceptableFactor,
240       float singleFactor, float multiFactor, float memoryFactor) {
241     if(singleFactor + multiFactor + memoryFactor != 1) {
242       throw new IllegalArgumentException("Single, multi, and memory factors " +
243           " should total 1.0");
244     }
245     if(minFactor >= acceptableFactor) {
246       throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
247     }
248     if(minFactor >= 1.0f || acceptableFactor >= 1.0f) {
249       throw new IllegalArgumentException("all factors must be < 1");
250     }
251     this.maxSize = maxSize;
252     this.blockSize = blockSize;
253     map = new ConcurrentHashMap<BlockCacheKey,CachedBlock>(mapInitialSize,
254         mapLoadFactor, mapConcurrencyLevel);
255     this.minFactor = minFactor;
256     this.acceptableFactor = acceptableFactor;
257     this.singleFactor = singleFactor;
258     this.multiFactor = multiFactor;
259     this.memoryFactor = memoryFactor;
260     this.stats = new CacheStats();
261     this.count = new AtomicLong(0);
262     this.elements = new AtomicLong(0);
263     this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
264     this.size = new AtomicLong(this.overhead);
265     if(evictionThread) {
266       this.evictionThread = new EvictionThread(this);
267       this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
268     } else {
269       this.evictionThread = null;
270     }
271     this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
272         statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
273   }
274 
275   public void setMaxSize(long maxSize) {
276     this.maxSize = maxSize;
277     if(this.size.get() > acceptableSize() && !evictionInProgress) {
278       runEviction();
279     }
280   }
281 
282   // BlockCache implementation
283 
284   /**
285    * Cache the block with the specified name and buffer.
286    * <p>
287    * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)
288    * this can happen, for which we compare the buffer contents.
289    * @param cacheKey block's cache key
290    * @param buf block buffer
291    * @param inMemory if block is in-memory
292    */
293   @Override
294   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
295     CachedBlock cb = map.get(cacheKey);
296     if(cb != null) {
297       // compare the contents, if they are not equal, we are in big trouble
298       if (compare(buf, cb.getBuffer()) != 0) {
299         throw new RuntimeException("Cached block contents differ, which should not have happened."
300           + "cacheKey:" + cacheKey);
301       }
302       String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey();
303       msg += ". This is harmless and can happen in rare cases (see HBASE-8547)";
304       LOG.warn(msg);
305       assert false : msg;
306       return;
307     }
308     cb = new CachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
309     long newSize = updateSizeMetrics(cb, false);
310     map.put(cacheKey, cb);
311     elements.incrementAndGet();
312     if(newSize > acceptableSize() && !evictionInProgress) {
313       runEviction();
314     }
315   }
316 
317   private int compare(Cacheable left, Cacheable right) {
318     ByteBuffer l = ByteBuffer.allocate(left.getSerializedLength());
319     left.serialize(l);
320     ByteBuffer r = ByteBuffer.allocate(right.getSerializedLength());
321     right.serialize(r);
322     return Bytes.compareTo(l.array(), l.arrayOffset(), l.limit(),
323       r.array(), r.arrayOffset(), r.limit());
324   }
325 
326   /**
327    * Cache the block with the specified name and buffer.
328    * <p>
329    * It is assumed this will NEVER be called on an already cached block.  If
330    * that is done, it is assumed that you are reinserting the same exact
331    * block due to a race condition and will update the buffer but not modify
332    * the size of the cache.
333    * @param cacheKey block's cache key
334    * @param buf block buffer
335    */
336   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
337     cacheBlock(cacheKey, buf, false);
338   }
339 
340   /**
341    * Helper function that updates the local size counter and also updates any
342    * per-cf or per-blocktype metrics it can discern from given
343    * {@link CachedBlock}
344    *
345    * @param cb
346    * @param evict
347    */
348   protected long updateSizeMetrics(CachedBlock cb, boolean evict) {
349     long heapsize = cb.heapSize();
350     if (evict) {
351       heapsize *= -1;
352     }
353     return size.addAndGet(heapsize);
354   }
355 
356   /**
357    * Get the buffer of the block with the specified name.
358    * @param cacheKey block's cache key
359    * @param caching true if the caller caches blocks on cache misses
360    * @param repeat Whether this is a repeat lookup for the same block
361    *        (used to avoid double counting cache misses when doing double-check locking)
362    * @return buffer of specified cache key, or null if not in cache
363    * @see HFileReaderV2#readBlock(long, long, boolean, boolean, boolean, BlockType)
364    */
365   @Override
366   public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat) {
367     CachedBlock cb = map.get(cacheKey);
368     if(cb == null) {
369       if (!repeat) stats.miss(caching);
370       if (victimHandler != null)
371         return victimHandler.getBlock(cacheKey, caching, repeat);
372       return null;
373     }
374     stats.hit(caching);
375     cb.access(count.incrementAndGet());
376     return cb.getBuffer();
377   }
378 
379   /**
380    * Whether the cache contains block with specified cacheKey
381    * @param cacheKey
382    * @return true if contains the block
383    */
384   public boolean containsBlock(BlockCacheKey cacheKey) {
385     return map.containsKey(cacheKey);
386   }
387 
388   @Override
389   public boolean evictBlock(BlockCacheKey cacheKey) {
390     CachedBlock cb = map.get(cacheKey);
391     if (cb == null) return false;
392     evictBlock(cb, false);
393     return true;
394   }
395 
396   /**
397    * Evicts all blocks for a specific HFile. This is an
398    * expensive operation implemented as a linear-time search through all blocks
399    * in the cache. Ideally this should be a search in a log-access-time map.
400    *
401    * <p>
402    * This is used for evict-on-close to remove all blocks of a specific HFile.
403    *
404    * @return the number of blocks evicted
405    */
406   @Override
407   public int evictBlocksByHfileName(String hfileName) {
408     int numEvicted = 0;
409     for (BlockCacheKey key : map.keySet()) {
410       if (key.getHfileName().equals(hfileName)) {
411         if (evictBlock(key))
412           ++numEvicted;
413       }
414     }
415     if (victimHandler != null) {
416       numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
417     }
418     return numEvicted;
419   }
420 
421   /**
422    * Evict the block, and it will be cached by the victim handler if exists &&
423    * block may be read again later
424    * @param block
425    * @param evictedByEvictionProcess true if the given block is evicted by
426    *          EvictionThread
427    * @return the heap size of evicted block
428    */
429   protected long evictBlock(CachedBlock block, boolean evictedByEvictionProcess) {
430     map.remove(block.getCacheKey());
431     updateSizeMetrics(block, true);
432     elements.decrementAndGet();
433     stats.evicted();
434     if (evictedByEvictionProcess && victimHandler != null) {
435       boolean wait = getCurrentSize() < acceptableSize();
436       boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
437       victimHandler.cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
438           inMemory, wait);
439     }
440     return block.heapSize();
441   }
442 
443   /**
444    * Multi-threaded call to run the eviction process.
445    */
446   private void runEviction() {
447     if(evictionThread == null) {
448       evict();
449     } else {
450       evictionThread.evict();
451     }
452   }
453 
454   /**
455    * Eviction method.
456    */
457   void evict() {
458 
459     // Ensure only one eviction at a time
460     if(!evictionLock.tryLock()) return;
461 
462     try {
463       evictionInProgress = true;
464       long currentSize = this.size.get();
465       long bytesToFree = currentSize - minSize();
466 
467       if (LOG.isDebugEnabled()) {
468         LOG.debug("Block cache LRU eviction started; Attempting to free " +
469           StringUtils.byteDesc(bytesToFree) + " of total=" +
470           StringUtils.byteDesc(currentSize));
471       }
472 
473       if(bytesToFree <= 0) return;
474 
475       // Instantiate priority buckets
476       BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize,
477           singleSize());
478       BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize,
479           multiSize());
480       BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize,
481           memorySize());
482 
483       // Scan entire map putting into appropriate buckets
484       for(CachedBlock cachedBlock : map.values()) {
485         switch(cachedBlock.getPriority()) {
486           case SINGLE: {
487             bucketSingle.add(cachedBlock);
488             break;
489           }
490           case MULTI: {
491             bucketMulti.add(cachedBlock);
492             break;
493           }
494           case MEMORY: {
495             bucketMemory.add(cachedBlock);
496             break;
497           }
498         }
499       }
500 
501       PriorityQueue<BlockBucket> bucketQueue =
502         new PriorityQueue<BlockBucket>(3);
503 
504       bucketQueue.add(bucketSingle);
505       bucketQueue.add(bucketMulti);
506       bucketQueue.add(bucketMemory);
507 
508       int remainingBuckets = 3;
509       long bytesFreed = 0;
510 
511       BlockBucket bucket;
512       while((bucket = bucketQueue.poll()) != null) {
513         long overflow = bucket.overflow();
514         if(overflow > 0) {
515           long bucketBytesToFree = Math.min(overflow,
516             (bytesToFree - bytesFreed) / remainingBuckets);
517           bytesFreed += bucket.free(bucketBytesToFree);
518         }
519         remainingBuckets--;
520       }
521 
522       if (LOG.isDebugEnabled()) {
523         long single = bucketSingle.totalSize();
524         long multi = bucketMulti.totalSize();
525         long memory = bucketMemory.totalSize();
526         LOG.debug("Block cache LRU eviction completed; " +
527           "freed=" + StringUtils.byteDesc(bytesFreed) + ", " +
528           "total=" + StringUtils.byteDesc(this.size.get()) + ", " +
529           "single=" + StringUtils.byteDesc(single) + ", " +
530           "multi=" + StringUtils.byteDesc(multi) + ", " +
531           "memory=" + StringUtils.byteDesc(memory));
532       }
533     } finally {
534       stats.evict();
535       evictionInProgress = false;
536       evictionLock.unlock();
537     }
538   }
539 
540   /**
541    * Used to group blocks into priority buckets.  There will be a BlockBucket
542    * for each priority (single, multi, memory).  Once bucketed, the eviction
543    * algorithm takes the appropriate number of elements out of each according
544    * to configuration parameters and their relatives sizes.
545    */
546   private class BlockBucket implements Comparable<BlockBucket> {
547 
548     private CachedBlockQueue queue;
549     private long totalSize = 0;
550     private long bucketSize;
551 
552     public BlockBucket(long bytesToFree, long blockSize, long bucketSize) {
553       this.bucketSize = bucketSize;
554       queue = new CachedBlockQueue(bytesToFree, blockSize);
555       totalSize = 0;
556     }
557 
558     public void add(CachedBlock block) {
559       totalSize += block.heapSize();
560       queue.add(block);
561     }
562 
563     public long free(long toFree) {
564       CachedBlock cb;
565       long freedBytes = 0;
566       while ((cb = queue.pollLast()) != null) {
567         freedBytes += evictBlock(cb, true);
568         if (freedBytes >= toFree) {
569           return freedBytes;
570         }
571       }
572       return freedBytes;
573     }
574 
575     public long overflow() {
576       return totalSize - bucketSize;
577     }
578 
579     public long totalSize() {
580       return totalSize;
581     }
582 
583     public int compareTo(BlockBucket that) {
584       if(this.overflow() == that.overflow()) return 0;
585       return this.overflow() > that.overflow() ? 1 : -1;
586     }
587 
588     @Override
589     public boolean equals(Object that) {
590       if (that == null || !(that instanceof BlockBucket)){
591         return false;
592       }
593 
594       return compareTo(( BlockBucket)that) == 0;
595     }
596 
597   }
598 
599   /**
600    * Get the maximum size of this cache.
601    * @return max size in bytes
602    */
603   public long getMaxSize() {
604     return this.maxSize;
605   }
606 
607   /**
608    * Get the current size of this cache.
609    * @return current size in bytes
610    */
611   public long getCurrentSize() {
612     return this.size.get();
613   }
614 
615   /**
616    * Get the current size of this cache.
617    * @return current size in bytes
618    */
619   public long getFreeSize() {
620     return getMaxSize() - getCurrentSize();
621   }
622 
623   /**
624    * Get the size of this cache (number of cached blocks)
625    * @return number of cached blocks
626    */
627   public long size() {
628     return this.elements.get();
629   }
630 
631   @Override
632   public long getBlockCount() {
633     return this.elements.get();
634   }
635 
636   /**
637    * Get the number of eviction runs that have occurred
638    */
639   public long getEvictionCount() {
640     return this.stats.getEvictionCount();
641   }
642 
643   /**
644    * Get the number of blocks that have been evicted during the lifetime
645    * of this cache.
646    */
647   public long getEvictedCount() {
648     return this.stats.getEvictedCount();
649   }
650 
651   EvictionThread getEvictionThread() {
652     return this.evictionThread;
653   }
654 
655   /*
656    * Eviction thread.  Sits in waiting state until an eviction is triggered
657    * when the cache size grows above the acceptable level.<p>
658    *
659    * Thread is triggered into action by {@link LruBlockCache#runEviction()}
660    */
661   static class EvictionThread extends HasThread {
662     private WeakReference<LruBlockCache> cache;
663     private boolean go = true;
664     // flag set after enter the run method, used for test
665     private boolean enteringRun = false;
666 
667     public EvictionThread(LruBlockCache cache) {
668       super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread");
669       setDaemon(true);
670       this.cache = new WeakReference<LruBlockCache>(cache);
671     }
672 
673     @Override
674     public void run() {
675       enteringRun = true;
676       while (this.go) {
677         synchronized(this) {
678           try {
679             this.wait();
680           } catch(InterruptedException e) {}
681         }
682         LruBlockCache cache = this.cache.get();
683         if(cache == null) break;
684         cache.evict();
685       }
686     }
687 
688     public void evict() {
689       synchronized(this) {
690         this.notifyAll(); // FindBugs NN_NAKED_NOTIFY
691       }
692     }
693 
694     synchronized void shutdown() {
695       this.go = false;
696       this.notifyAll();
697     }
698 
699     /**
700      * Used for the test.
701      */
702     boolean isEnteringRun() {
703       return this.enteringRun;
704     }
705   }
706 
707   /*
708    * Statistics thread.  Periodically prints the cache statistics to the log.
709    */
710   static class StatisticsThread extends Thread {
711     LruBlockCache lru;
712 
713     public StatisticsThread(LruBlockCache lru) {
714       super("LruBlockCache.StatisticsThread");
715       setDaemon(true);
716       this.lru = lru;
717     }
718     @Override
719     public void run() {
720       lru.logStats();
721     }
722   }
723 
724   public void logStats() {
725     if (!LOG.isDebugEnabled()) return;
726     // Log size
727     long totalSize = heapSize();
728     long freeSize = maxSize - totalSize;
729     LruBlockCache.LOG.debug("Stats: " +
730         "total=" + StringUtils.byteDesc(totalSize) + ", " +
731         "free=" + StringUtils.byteDesc(freeSize) + ", " +
732         "max=" + StringUtils.byteDesc(this.maxSize) + ", " +
733         "blocks=" + size() +", " +
734         "accesses=" + stats.getRequestCount() + ", " +
735         "hits=" + stats.getHitCount() + ", " +
736         "hitRatio=" +
737           (stats.getHitCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
738         "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
739         "cachingHits=" + stats.getHitCachingCount() + ", " +
740         "cachingHitsRatio=" +
741           (stats.getHitCachingCount() == 0 ? "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
742         "evictions=" + stats.getEvictionCount() + ", " +
743         "evicted=" + stats.getEvictedCount() + ", " +
744         "evictedPerRun=" + stats.evictedPerEviction());
745   }
746 
747   /**
748    * Get counter statistics for this cache.
749    *
750    * <p>Includes: total accesses, hits, misses, evicted blocks, and runs
751    * of the eviction processes.
752    */
753   public CacheStats getStats() {
754     return this.stats;
755   }
756 
757   public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
758       (3 * Bytes.SIZEOF_LONG) + (9 * ClassSize.REFERENCE) +
759       (5 * Bytes.SIZEOF_FLOAT) + Bytes.SIZEOF_BOOLEAN
760       + ClassSize.OBJECT);
761 
762   // HeapSize implementation
763   public long heapSize() {
764     return getCurrentSize();
765   }
766 
767   public static long calculateOverhead(long maxSize, long blockSize, int concurrency){
768     // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG
769     return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP +
770         ((long)Math.ceil(maxSize*1.2/blockSize)
771             * ClassSize.CONCURRENT_HASHMAP_ENTRY) +
772         ((long)concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
773   }
774 
775   @Override
776   public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries(Configuration conf) throws IOException {
777 
778     Map<String, Path> sfMap = FSUtils.getTableStoreFilePathMap(
779         FileSystem.get(conf),
780         FSUtils.getRootDir(conf));
781 
782     // quirky, but it's a compound key and this is a shortcut taken instead of
783     // creating a class that would represent only a key.
784     Map<BlockCacheColumnFamilySummary, BlockCacheColumnFamilySummary> bcs =
785       new HashMap<BlockCacheColumnFamilySummary, BlockCacheColumnFamilySummary>();
786 
787     for (CachedBlock cb : map.values()) {
788       String sf = cb.getCacheKey().getHfileName();
789       Path path = sfMap.get(sf);
790       if ( path != null) {
791         BlockCacheColumnFamilySummary lookup =
792           BlockCacheColumnFamilySummary.createFromStoreFilePath(path);
793         BlockCacheColumnFamilySummary bcse = bcs.get(lookup);
794         if (bcse == null) {
795           bcse = BlockCacheColumnFamilySummary.create(lookup);
796           bcs.put(lookup,bcse);
797         }
798         bcse.incrementBlocks();
799         bcse.incrementHeapSize(cb.heapSize());
800       }
801     }
802     List<BlockCacheColumnFamilySummary> list =
803         new ArrayList<BlockCacheColumnFamilySummary>(bcs.values());
804     Collections.sort( list );
805     return list;
806   }
807 
808   // Simple calculators of sizes given factors and maxSize
809 
810   private long acceptableSize() {
811     return (long)Math.floor(this.maxSize * this.acceptableFactor);
812   }
813   private long minSize() {
814     return (long)Math.floor(this.maxSize * this.minFactor);
815   }
816   private long singleSize() {
817     return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor);
818   }
819   private long multiSize() {
820     return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor);
821   }
822   private long memorySize() {
823     return (long)Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
824   }
825 
826   public void shutdown() {
827     if (victimHandler != null)
828       victimHandler.shutdown();
829     this.scheduleThreadPool.shutdown();
830     for (int i = 0; i < 10; i++) {
831       if (!this.scheduleThreadPool.isShutdown()) Threads.sleep(10);
832     }
833     if (!this.scheduleThreadPool.isShutdown()) {
834       List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
835       LOG.debug("Still running " + runnables);
836     }
837     this.evictionThread.shutdown();
838   }
839 
840   /** Clears the cache. Used in tests. */
841   public void clearCache() {
842     map.clear();
843   }
844 
845   /**
846    * Used in testing. May be very inefficient.
847    * @return the set of cached file names
848    */
849   SortedSet<String> getCachedFileNamesForTest() {
850     SortedSet<String> fileNames = new TreeSet<String>();
851     for (BlockCacheKey cacheKey : map.keySet()) {
852       fileNames.add(cacheKey.getHfileName());
853     }
854     return fileNames;
855   }
856 
857   Map<BlockType, Integer> getBlockTypeCountsForTest() {
858     Map<BlockType, Integer> counts =
859         new EnumMap<BlockType, Integer>(BlockType.class);
860     for (CachedBlock cb : map.values()) {
861       BlockType blockType = ((HFileBlock) cb.getBuffer()).getBlockType();
862       Integer count = counts.get(blockType);
863       counts.put(blockType, (count == null ? 0 : count) + 1);
864     }
865     return counts;
866   }
867 
868   public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
869     Map<DataBlockEncoding, Integer> counts =
870         new EnumMap<DataBlockEncoding, Integer>(DataBlockEncoding.class);
871     for (BlockCacheKey cacheKey : map.keySet()) {
872       DataBlockEncoding encoding = cacheKey.getDataBlockEncoding();
873       Integer count = counts.get(encoding);
874       counts.put(encoding, (count == null ? 0 : count) + 1);
875     }
876     return counts;
877   }
878 
879   public void setVictimCache(BucketCache handler) {
880     assert victimHandler == null;
881     victimHandler = handler;
882   }
883 
884 }