View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.io.IOException;
22  import java.lang.ref.WeakReference;
23  import java.nio.ByteBuffer;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.EnumMap;
27  import java.util.HashMap;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.PriorityQueue;
31  import java.util.SortedSet;
32  import java.util.TreeSet;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.Executors;
35  import java.util.concurrent.ScheduledExecutorService;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.atomic.AtomicLong;
38  import java.util.concurrent.locks.ReentrantLock;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.classification.InterfaceAudience;
43  import org.apache.hadoop.conf.Configuration;
44  import org.apache.hadoop.fs.FileSystem;
45  import org.apache.hadoop.fs.Path;
46  import org.apache.hadoop.hbase.io.HeapSize;
47  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
48  import org.apache.hadoop.hbase.io.hfile.CachedBlock.BlockPriority;
49  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
50  import org.apache.hadoop.hbase.util.Bytes;
51  import org.apache.hadoop.hbase.util.ClassSize;
52  import org.apache.hadoop.hbase.util.FSUtils;
53  import org.apache.hadoop.hbase.util.HasThread;
54  import org.apache.hadoop.util.StringUtils;
55  
56  import com.google.common.util.concurrent.ThreadFactoryBuilder;
57  
58  /**
59   * A block cache implementation that is memory-aware using {@link HeapSize},
60   * memory-bound using an LRU eviction algorithm, and concurrent: backed by a
61   * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving
62   * constant-time {@link #cacheBlock} and {@link #getBlock} operations.<p>
63   *
64   * Contains three levels of block priority to allow for
65   * scan-resistance and in-memory families.  A block is added with an inMemory
66   * flag if necessary, otherwise a block becomes a single access priority.  Once
67   * a blocked is accessed again, it changes to multiple access.  This is used
68   * to prevent scans from thrashing the cache, adding a least-frequently-used
69   * element to the eviction algorithm.<p>
70   *
71   * Each priority is given its own chunk of the total cache to ensure
72   * fairness during eviction.  Each priority will retain close to its maximum
73   * size, however, if any priority is not using its entire chunk the others
74   * are able to grow beyond their chunk size.<p>
75   *
76   * Instantiated at a minimum with the total size and average block size.
77   * All sizes are in bytes.  The block size is not especially important as this
78   * cache is fully dynamic in its sizing of blocks.  It is only used for
79   * pre-allocating data structures and in initial heap estimation of the map.<p>
80   *
81   * The detailed constructor defines the sizes for the three priorities (they
82   * should total to the maximum size defined).  It also sets the levels that
83   * trigger and control the eviction thread.<p>
84   *
85   * The acceptable size is the cache size level which triggers the eviction
86   * process to start.  It evicts enough blocks to get the size below the
87   * minimum size specified.<p>
88   *
89   * Eviction happens in a separate thread and involves a single full-scan
90   * of the map.  It determines how many bytes must be freed to reach the minimum
91   * size, and then while scanning determines the fewest least-recently-used
92   * blocks necessary from each of the three priorities (would be 3 times bytes
93   * to free).  It then uses the priority chunk sizes to evict fairly according
94   * to the relative sizes and usage.
95   */
96  @InterfaceAudience.Private
97  public class LruBlockCache implements ResizableBlockCache, HeapSize {
98  
99    static final Log LOG = LogFactory.getLog(LruBlockCache.class);
100 
101   static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";
102   static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.acceptable.factor";
103   static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.single.percentage";
104   static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.multi.percentage";
105   static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.memory.percentage";
106 
107   /**
108    * Configuration key to force data-block always(except in-memory are too much)
109    * cached in memory for in-memory hfile, unlike inMemory, which is a column-family
110    * configuration, inMemoryForceMode is a cluster-wide configuration
111    */
112   static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME = "hbase.lru.rs.inmemoryforcemode";
113 
114   /** Default Configuration Parameters*/
115 
116   /** Backing Concurrent Map Configuration */
117   static final float DEFAULT_LOAD_FACTOR = 0.75f;
118   static final int DEFAULT_CONCURRENCY_LEVEL = 16;
119 
120   /** Eviction thresholds */
121   static final float DEFAULT_MIN_FACTOR = 0.95f;
122   static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f;
123 
124   /** Priority buckets */
125   static final float DEFAULT_SINGLE_FACTOR = 0.25f;
126   static final float DEFAULT_MULTI_FACTOR = 0.50f;
127   static final float DEFAULT_MEMORY_FACTOR = 0.25f;
128 
129   static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false;
130 
131   /** Statistics thread */
132   static final int statThreadPeriod = 60 * 5;
133 
134   /** Concurrent map (the cache) */
135   private final ConcurrentHashMap<BlockCacheKey,CachedBlock> map;
136 
137   /** Eviction lock (locked when eviction in process) */
138   private final ReentrantLock evictionLock = new ReentrantLock(true);
139 
140   /** Volatile boolean to track if we are in an eviction process or not */
141   private volatile boolean evictionInProgress = false;
142 
143   /** Eviction thread */
144   private final EvictionThread evictionThread;
145 
146   /** Statistics thread schedule pool (for heavy debugging, could remove) */
147   private final ScheduledExecutorService scheduleThreadPool =
148     Executors.newScheduledThreadPool(1,
149       new ThreadFactoryBuilder()
150         .setNameFormat("LruStats #%d")
151         .setDaemon(true)
152         .build());
153 
154   /** Current size of cache */
155   private final AtomicLong size;
156 
157   /** Current number of cached elements */
158   private final AtomicLong elements;
159 
160   /** Cache access count (sequential ID) */
161   private final AtomicLong count;
162 
163   /** Cache statistics */
164   private final CacheStats stats;
165 
166   /** Maximum allowable size of cache (block put if size > max, evict) */
167   private long maxSize;
168 
169   /** Approximate block size */
170   private long blockSize;
171 
172   /** Acceptable size of cache (no evictions if size < acceptable) */
173   private float acceptableFactor;
174 
175   /** Minimum threshold of cache (when evicting, evict until size < min) */
176   private float minFactor;
177 
178   /** Single access bucket size */
179   private float singleFactor;
180 
181   /** Multiple access bucket size */
182   private float multiFactor;
183 
184   /** In-memory bucket size */
185   private float memoryFactor;
186 
187   /** Overhead of the structure itself */
188   private long overhead;
189 
190   /** Whether in-memory hfile's data block has higher priority when evicting */
191   private boolean forceInMemory;
192 
193   /** Where to send victims (blocks evicted from the cache) */
194   private BucketCache victimHandler = null;
195 
196   /**
197    * Default constructor.  Specify maximum size and expected average block
198    * size (approximation is fine).
199    *
200    * <p>All other factors will be calculated based on defaults specified in
201    * this class.
202    * @param maxSize maximum size of cache, in bytes
203    * @param blockSize approximate size of each block, in bytes
204    */
205   public LruBlockCache(long maxSize, long blockSize) {
206     this(maxSize, blockSize, true);
207   }
208 
209   /**
210    * Constructor used for testing.  Allows disabling of the eviction thread.
211    */
212   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
213     this(maxSize, blockSize, evictionThread,
214         (int)Math.ceil(1.2*maxSize/blockSize),
215         DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
216         DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
217         DEFAULT_SINGLE_FACTOR,
218         DEFAULT_MULTI_FACTOR,
219         DEFAULT_MEMORY_FACTOR,
220         false
221         );
222   }
223 
224   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
225     this(maxSize, blockSize, evictionThread,
226         (int)Math.ceil(1.2*maxSize/blockSize),
227         DEFAULT_LOAD_FACTOR,
228         DEFAULT_CONCURRENCY_LEVEL,
229         conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
230         conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR),
231         conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR),
232         conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR),
233         conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR),
234         conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE)
235         );
236   }
237 
238   public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
239     this(maxSize, blockSize, true, conf);
240   }
241 
242   /**
243    * Configurable constructor.  Use this constructor if not using defaults.
244    * @param maxSize maximum size of this cache, in bytes
245    * @param blockSize expected average size of blocks, in bytes
246    * @param evictionThread whether to run evictions in a bg thread or not
247    * @param mapInitialSize initial size of backing ConcurrentHashMap
248    * @param mapLoadFactor initial load factor of backing ConcurrentHashMap
249    * @param mapConcurrencyLevel initial concurrency factor for backing CHM
250    * @param minFactor percentage of total size that eviction will evict until
251    * @param acceptableFactor percentage of total size that triggers eviction
252    * @param singleFactor percentage of total size for single-access blocks
253    * @param multiFactor percentage of total size for multiple-access blocks
254    * @param memoryFactor percentage of total size for in-memory blocks
255    */
256   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
257       int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
258       float minFactor, float acceptableFactor, float singleFactor,
259       float multiFactor, float memoryFactor, boolean forceInMemory) {
260     if(singleFactor + multiFactor + memoryFactor != 1 ||
261         singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) {
262       throw new IllegalArgumentException("Single, multi, and memory factors " +
263           " should be non-negative and total 1.0");
264     }
265     if(minFactor >= acceptableFactor) {
266       throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
267     }
268     if(minFactor >= 1.0f || acceptableFactor >= 1.0f) {
269       throw new IllegalArgumentException("all factors must be < 1");
270     }
271     this.maxSize = maxSize;
272     this.blockSize = blockSize;
273     this.forceInMemory = forceInMemory;
274     map = new ConcurrentHashMap<BlockCacheKey,CachedBlock>(mapInitialSize,
275         mapLoadFactor, mapConcurrencyLevel);
276     this.minFactor = minFactor;
277     this.acceptableFactor = acceptableFactor;
278     this.singleFactor = singleFactor;
279     this.multiFactor = multiFactor;
280     this.memoryFactor = memoryFactor;
281     this.stats = new CacheStats();
282     this.count = new AtomicLong(0);
283     this.elements = new AtomicLong(0);
284     this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
285     this.size = new AtomicLong(this.overhead);
286     if(evictionThread) {
287       this.evictionThread = new EvictionThread(this);
288       this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
289     } else {
290       this.evictionThread = null;
291     }
292     this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
293         statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
294   }
295 
296   @Override
297   public void setMaxSize(long maxSize) {
298     this.maxSize = maxSize;
299     if(this.size.get() > acceptableSize() && !evictionInProgress) {
300       runEviction();
301     }
302   }
303 
304   // BlockCache implementation
305 
306   /**
307    * Cache the block with the specified name and buffer.
308    * <p>
309    * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)
310    * this can happen, for which we compare the buffer contents.
311    * @param cacheKey block's cache key
312    * @param buf block buffer
313    * @param inMemory if block is in-memory
314    */
315   @Override
316   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
317     CachedBlock cb = map.get(cacheKey);
318     if(cb != null) {
319       // compare the contents, if they are not equal, we are in big trouble
320       if (compare(buf, cb.getBuffer()) != 0) {
321         throw new RuntimeException("Cached block contents differ, which should not have happened."
322           + "cacheKey:" + cacheKey);
323       }
324       String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey();
325       msg += ". This is harmless and can happen in rare cases (see HBASE-8547)";
326       LOG.warn(msg);
327       return;
328     }
329     cb = new CachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
330     long newSize = updateSizeMetrics(cb, false);
331     map.put(cacheKey, cb);
332     elements.incrementAndGet();
333     if(newSize > acceptableSize() && !evictionInProgress) {
334       runEviction();
335     }
336   }
337 
338   private int compare(Cacheable left, Cacheable right) {
339     ByteBuffer l = ByteBuffer.allocate(left.getSerializedLength());
340     left.serialize(l);
341     ByteBuffer r = ByteBuffer.allocate(right.getSerializedLength());
342     right.serialize(r);
343     return Bytes.compareTo(l.array(), l.arrayOffset(), l.limit(),
344       r.array(), r.arrayOffset(), r.limit());
345   }
346 
347   /**
348    * Cache the block with the specified name and buffer.
349    * <p>
350    * It is assumed this will NEVER be called on an already cached block.  If
351    * that is done, it is assumed that you are reinserting the same exact
352    * block due to a race condition and will update the buffer but not modify
353    * the size of the cache.
354    * @param cacheKey block's cache key
355    * @param buf block buffer
356    */
357   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
358     cacheBlock(cacheKey, buf, false);
359   }
360 
361   /**
362    * Helper function that updates the local size counter and also updates any
363    * per-cf or per-blocktype metrics it can discern from given
364    * {@link CachedBlock}
365    *
366    * @param cb
367    * @param evict
368    */
369   protected long updateSizeMetrics(CachedBlock cb, boolean evict) {
370     long heapsize = cb.heapSize();
371     if (evict) {
372       heapsize *= -1;
373     }
374     return size.addAndGet(heapsize);
375   }
376 
377   /**
378    * Get the buffer of the block with the specified name.
379    * @param cacheKey block's cache key
380    * @param caching true if the caller caches blocks on cache misses
381    * @param repeat Whether this is a repeat lookup for the same block
382    *        (used to avoid double counting cache misses when doing double-check locking)
383    * @return buffer of specified cache key, or null if not in cache
384    * @see HFileReaderV2#readBlock(long, long, boolean, boolean, boolean, BlockType)
385    */
386   @Override
387   public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat) {
388     CachedBlock cb = map.get(cacheKey);
389     if(cb == null) {
390       if (!repeat) stats.miss(caching);
391       if (victimHandler != null)
392         return victimHandler.getBlock(cacheKey, caching, repeat);
393       return null;
394     }
395     stats.hit(caching);
396     cb.access(count.incrementAndGet());
397     return cb.getBuffer();
398   }
399 
400   /**
401    * Whether the cache contains block with specified cacheKey
402    * @param cacheKey
403    * @return true if contains the block
404    */
405   public boolean containsBlock(BlockCacheKey cacheKey) {
406     return map.containsKey(cacheKey);
407   }
408 
409   @Override
410   public boolean evictBlock(BlockCacheKey cacheKey) {
411     CachedBlock cb = map.get(cacheKey);
412     if (cb == null) return false;
413     evictBlock(cb, false);
414     return true;
415   }
416 
417   /**
418    * Evicts all blocks for a specific HFile. This is an
419    * expensive operation implemented as a linear-time search through all blocks
420    * in the cache. Ideally this should be a search in a log-access-time map.
421    *
422    * <p>
423    * This is used for evict-on-close to remove all blocks of a specific HFile.
424    *
425    * @return the number of blocks evicted
426    */
427   @Override
428   public int evictBlocksByHfileName(String hfileName) {
429     int numEvicted = 0;
430     for (BlockCacheKey key : map.keySet()) {
431       if (key.getHfileName().equals(hfileName)) {
432         if (evictBlock(key))
433           ++numEvicted;
434       }
435     }
436     if (victimHandler != null) {
437       numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
438     }
439     return numEvicted;
440   }
441 
442   /**
443    * Evict the block, and it will be cached by the victim handler if exists &&
444    * block may be read again later
445    * @param block
446    * @param evictedByEvictionProcess true if the given block is evicted by
447    *          EvictionThread
448    * @return the heap size of evicted block
449    */
450   protected long evictBlock(CachedBlock block, boolean evictedByEvictionProcess) {
451     map.remove(block.getCacheKey());
452     updateSizeMetrics(block, true);
453     elements.decrementAndGet();
454     stats.evicted();
455     if (evictedByEvictionProcess && victimHandler != null) {
456       boolean wait = getCurrentSize() < acceptableSize();
457       boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
458       victimHandler.cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
459           inMemory, wait);
460     }
461     return block.heapSize();
462   }
463 
464   /**
465    * Multi-threaded call to run the eviction process.
466    */
467   private void runEviction() {
468     if(evictionThread == null) {
469       evict();
470     } else {
471       evictionThread.evict();
472     }
473   }
474 
475   /**
476    * Eviction method.
477    */
478   void evict() {
479 
480     // Ensure only one eviction at a time
481     if(!evictionLock.tryLock()) return;
482 
483     try {
484       evictionInProgress = true;
485       long currentSize = this.size.get();
486       long bytesToFree = currentSize - minSize();
487 
488       if (LOG.isTraceEnabled()) {
489         LOG.trace("Block cache LRU eviction started; Attempting to free " +
490           StringUtils.byteDesc(bytesToFree) + " of total=" +
491           StringUtils.byteDesc(currentSize));
492       }
493 
494       if(bytesToFree <= 0) return;
495 
496       // Instantiate priority buckets
497       BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize,
498           singleSize());
499       BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize,
500           multiSize());
501       BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize,
502           memorySize());
503 
504       // Scan entire map putting into appropriate buckets
505       for(CachedBlock cachedBlock : map.values()) {
506         switch(cachedBlock.getPriority()) {
507           case SINGLE: {
508             bucketSingle.add(cachedBlock);
509             break;
510           }
511           case MULTI: {
512             bucketMulti.add(cachedBlock);
513             break;
514           }
515           case MEMORY: {
516             bucketMemory.add(cachedBlock);
517             break;
518           }
519         }
520       }
521 
522       long bytesFreed = 0;
523       if (forceInMemory || memoryFactor > 0.999f) {
524         long s = bucketSingle.totalSize();
525         long m = bucketMulti.totalSize();
526         if (bytesToFree > (s + m)) {
527           // this means we need to evict blocks in memory bucket to make room,
528           // so the single and multi buckets will be emptied
529           bytesFreed = bucketSingle.free(s);
530           bytesFreed += bucketMulti.free(m);
531           bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
532         } else {
533           // this means no need to evict block in memory bucket,
534           // and we try best to make the ratio between single-bucket and
535           // multi-bucket is 1:2
536           long bytesRemain = s + m - bytesToFree;
537           if (3 * s <= bytesRemain) {
538             // single-bucket is small enough that no eviction happens for it
539             // hence all eviction goes from multi-bucket
540             bytesFreed = bucketMulti.free(bytesToFree);
541           } else if (3 * m <= 2 * bytesRemain) {
542             // multi-bucket is small enough that no eviction happens for it
543             // hence all eviction goes from single-bucket
544             bytesFreed = bucketSingle.free(bytesToFree);
545           } else {
546             // both buckets need to evict some blocks
547             bytesFreed = bucketSingle.free(s - bytesRemain / 3);
548             if (bytesFreed < bytesToFree) {
549               bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
550             }
551           }
552         }
553       } else {
554         PriorityQueue<BlockBucket> bucketQueue =
555           new PriorityQueue<BlockBucket>(3);
556 
557         bucketQueue.add(bucketSingle);
558         bucketQueue.add(bucketMulti);
559         bucketQueue.add(bucketMemory);
560 
561         int remainingBuckets = 3;
562 
563         BlockBucket bucket;
564         while((bucket = bucketQueue.poll()) != null) {
565           long overflow = bucket.overflow();
566           if(overflow > 0) {
567             long bucketBytesToFree = Math.min(overflow,
568                 (bytesToFree - bytesFreed) / remainingBuckets);
569             bytesFreed += bucket.free(bucketBytesToFree);
570           }
571           remainingBuckets--;
572         }
573       }
574 
575       if (LOG.isTraceEnabled()) {
576         long single = bucketSingle.totalSize();
577         long multi = bucketMulti.totalSize();
578         long memory = bucketMemory.totalSize();
579         LOG.trace("Block cache LRU eviction completed; " +
580           "freed=" + StringUtils.byteDesc(bytesFreed) + ", " +
581           "total=" + StringUtils.byteDesc(this.size.get()) + ", " +
582           "single=" + StringUtils.byteDesc(single) + ", " +
583           "multi=" + StringUtils.byteDesc(multi) + ", " +
584           "memory=" + StringUtils.byteDesc(memory));
585       }
586     } finally {
587       stats.evict();
588       evictionInProgress = false;
589       evictionLock.unlock();
590     }
591   }
592 
593   /**
594    * Used to group blocks into priority buckets.  There will be a BlockBucket
595    * for each priority (single, multi, memory).  Once bucketed, the eviction
596    * algorithm takes the appropriate number of elements out of each according
597    * to configuration parameters and their relatives sizes.
598    */
599   private class BlockBucket implements Comparable<BlockBucket> {
600 
601     private CachedBlockQueue queue;
602     private long totalSize = 0;
603     private long bucketSize;
604 
605     public BlockBucket(long bytesToFree, long blockSize, long bucketSize) {
606       this.bucketSize = bucketSize;
607       queue = new CachedBlockQueue(bytesToFree, blockSize);
608       totalSize = 0;
609     }
610 
611     public void add(CachedBlock block) {
612       totalSize += block.heapSize();
613       queue.add(block);
614     }
615 
616     public long free(long toFree) {
617       CachedBlock cb;
618       long freedBytes = 0;
619       while ((cb = queue.pollLast()) != null) {
620         freedBytes += evictBlock(cb, true);
621         if (freedBytes >= toFree) {
622           return freedBytes;
623         }
624       }
625       return freedBytes;
626     }
627 
628     public long overflow() {
629       return totalSize - bucketSize;
630     }
631 
632     public long totalSize() {
633       return totalSize;
634     }
635 
636     public int compareTo(BlockBucket that) {
637       if(this.overflow() == that.overflow()) return 0;
638       return this.overflow() > that.overflow() ? 1 : -1;
639     }
640 
641     @Override
642     public boolean equals(Object that) {
643       if (that == null || !(that instanceof BlockBucket)){
644         return false;
645       }
646 
647       return compareTo(( BlockBucket)that) == 0;
648     }
649 
650   }
651 
652   /**
653    * Get the maximum size of this cache.
654    * @return max size in bytes
655    */
656   public long getMaxSize() {
657     return this.maxSize;
658   }
659 
660   @Override
661   public long getCurrentSize() {
662     return this.size.get();
663   }
664 
665   @Override
666   public long getFreeSize() {
667     return getMaxSize() - getCurrentSize();
668   }
669 
670   @Override
671   public long size() {
672     return this.elements.get();
673   }
674 
675   @Override
676   public long getBlockCount() {
677     return this.elements.get();
678   }
679 
680   /**
681    * Get the number of eviction runs that have occurred
682    */
683   public long getEvictionCount() {
684     return this.stats.getEvictionCount();
685   }
686 
687   @Override
688   public long getEvictedCount() {
689     return this.stats.getEvictedCount();
690   }
691 
692   EvictionThread getEvictionThread() {
693     return this.evictionThread;
694   }
695 
696   /*
697    * Eviction thread.  Sits in waiting state until an eviction is triggered
698    * when the cache size grows above the acceptable level.<p>
699    *
700    * Thread is triggered into action by {@link LruBlockCache#runEviction()}
701    */
702   static class EvictionThread extends HasThread {
703     private WeakReference<LruBlockCache> cache;
704     private boolean go = true;
705     // flag set after enter the run method, used for test
706     private boolean enteringRun = false;
707 
708     public EvictionThread(LruBlockCache cache) {
709       super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread");
710       setDaemon(true);
711       this.cache = new WeakReference<LruBlockCache>(cache);
712     }
713 
714     @Override
715     public void run() {
716       enteringRun = true;
717       while (this.go) {
718         synchronized(this) {
719           try {
720             this.wait();
721           } catch(InterruptedException e) {}
722         }
723         LruBlockCache cache = this.cache.get();
724         if(cache == null) break;
725         cache.evict();
726       }
727     }
728 
729     public void evict() {
730       synchronized(this) {
731         this.notifyAll(); // FindBugs NN_NAKED_NOTIFY
732       }
733     }
734 
735     synchronized void shutdown() {
736       this.go = false;
737       this.notifyAll();
738     }
739 
740     /**
741      * Used for the test.
742      */
743     boolean isEnteringRun() {
744       return this.enteringRun;
745     }
746   }
747 
748   /*
749    * Statistics thread.  Periodically prints the cache statistics to the log.
750    */
751   static class StatisticsThread extends Thread {
752     LruBlockCache lru;
753 
754     public StatisticsThread(LruBlockCache lru) {
755       super("LruBlockCache.StatisticsThread");
756       setDaemon(true);
757       this.lru = lru;
758     }
759     @Override
760     public void run() {
761       lru.logStats();
762     }
763   }
764 
765   public void logStats() {
766     if (!LOG.isDebugEnabled()) return;
767     // Log size
768     long totalSize = heapSize();
769     long freeSize = maxSize - totalSize;
770     LruBlockCache.LOG.debug("Total=" + StringUtils.byteDesc(totalSize) + ", " +
771         "free=" + StringUtils.byteDesc(freeSize) + ", " +
772         "max=" + StringUtils.byteDesc(this.maxSize) + ", " +
773         "blocks=" + size() +", " +
774         "accesses=" + stats.getRequestCount() + ", " +
775         "hits=" + stats.getHitCount() + ", " +
776         "hitRatio=" +
777           (stats.getHitCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
778         "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
779         "cachingHits=" + stats.getHitCachingCount() + ", " +
780         "cachingHitsRatio=" +
781           (stats.getHitCachingCount() == 0 ? "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
782         "evictions=" + stats.getEvictionCount() + ", " +
783         "evicted=" + stats.getEvictedCount() + ", " +
784         "evictedPerRun=" + stats.evictedPerEviction());
785   }
786 
787   /**
788    * Get counter statistics for this cache.
789    *
790    * <p>Includes: total accesses, hits, misses, evicted blocks, and runs
791    * of the eviction processes.
792    */
793   public CacheStats getStats() {
794     return this.stats;
795   }
796 
797   public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
798       (3 * Bytes.SIZEOF_LONG) + (9 * ClassSize.REFERENCE) +
799       (5 * Bytes.SIZEOF_FLOAT) + Bytes.SIZEOF_BOOLEAN
800       + ClassSize.OBJECT);
801 
802   // HeapSize implementation
803   public long heapSize() {
804     return getCurrentSize();
805   }
806 
807   public static long calculateOverhead(long maxSize, long blockSize, int concurrency){
808     // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG
809     return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP +
810         ((long)Math.ceil(maxSize*1.2/blockSize)
811             * ClassSize.CONCURRENT_HASHMAP_ENTRY) +
812         ((long)concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
813   }
814 
815   @Override
816   public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries(Configuration conf) throws IOException {
817 
818     Map<String, Path> sfMap = FSUtils.getTableStoreFilePathMap(
819         FileSystem.get(conf),
820         FSUtils.getRootDir(conf));
821 
822     // quirky, but it's a compound key and this is a shortcut taken instead of
823     // creating a class that would represent only a key.
824     Map<BlockCacheColumnFamilySummary, BlockCacheColumnFamilySummary> bcs =
825       new HashMap<BlockCacheColumnFamilySummary, BlockCacheColumnFamilySummary>();
826 
827     for (CachedBlock cb : map.values()) {
828       String sf = cb.getCacheKey().getHfileName();
829       Path path = sfMap.get(sf);
830       if ( path != null) {
831         BlockCacheColumnFamilySummary lookup =
832           BlockCacheColumnFamilySummary.createFromStoreFilePath(path);
833         BlockCacheColumnFamilySummary bcse = bcs.get(lookup);
834         if (bcse == null) {
835           bcse = BlockCacheColumnFamilySummary.create(lookup);
836           bcs.put(lookup,bcse);
837         }
838         bcse.incrementBlocks();
839         bcse.incrementHeapSize(cb.heapSize());
840       }
841     }
842     List<BlockCacheColumnFamilySummary> list =
843         new ArrayList<BlockCacheColumnFamilySummary>(bcs.values());
844     Collections.sort( list );
845     return list;
846   }
847 
848   // Simple calculators of sizes given factors and maxSize
849 
850   private long acceptableSize() {
851     return (long)Math.floor(this.maxSize * this.acceptableFactor);
852   }
853   private long minSize() {
854     return (long)Math.floor(this.maxSize * this.minFactor);
855   }
856   private long singleSize() {
857     return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor);
858   }
859   private long multiSize() {
860     return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor);
861   }
862   private long memorySize() {
863     return (long)Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
864   }
865 
866   public void shutdown() {
867     if (victimHandler != null)
868       victimHandler.shutdown();
869     this.scheduleThreadPool.shutdown();
870     for (int i = 0; i < 10; i++) {
871       if (!this.scheduleThreadPool.isShutdown()) {
872         try {
873           Thread.sleep(10);
874         } catch (InterruptedException e) {
875           LOG.warn("Interrupted while sleeping");
876           Thread.currentThread().interrupt();
877           break;
878         }
879       }
880     }
881 
882     if (!this.scheduleThreadPool.isShutdown()) {
883       List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
884       LOG.debug("Still running " + runnables);
885     }
886     this.evictionThread.shutdown();
887   }
888 
889   /** Clears the cache. Used in tests. */
890   public void clearCache() {
891     map.clear();
892   }
893 
894   /**
895    * Used in testing. May be very inefficient.
896    * @return the set of cached file names
897    */
898   SortedSet<String> getCachedFileNamesForTest() {
899     SortedSet<String> fileNames = new TreeSet<String>();
900     for (BlockCacheKey cacheKey : map.keySet()) {
901       fileNames.add(cacheKey.getHfileName());
902     }
903     return fileNames;
904   }
905 
906   Map<BlockType, Integer> getBlockTypeCountsForTest() {
907     Map<BlockType, Integer> counts =
908         new EnumMap<BlockType, Integer>(BlockType.class);
909     for (CachedBlock cb : map.values()) {
910       BlockType blockType = ((HFileBlock) cb.getBuffer()).getBlockType();
911       Integer count = counts.get(blockType);
912       counts.put(blockType, (count == null ? 0 : count) + 1);
913     }
914     return counts;
915   }
916 
917   public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
918     Map<DataBlockEncoding, Integer> counts =
919         new EnumMap<DataBlockEncoding, Integer>(DataBlockEncoding.class);
920     for (BlockCacheKey cacheKey : map.keySet()) {
921       DataBlockEncoding encoding = cacheKey.getDataBlockEncoding();
922       Integer count = counts.get(encoding);
923       counts.put(encoding, (count == null ? 0 : count) + 1);
924     }
925     return counts;
926   }
927 
928   public void setVictimCache(BucketCache handler) {
929     assert victimHandler == null;
930     victimHandler = handler;
931   }
932 
933 }