View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.lang.ref.WeakReference;
22  import java.nio.ByteBuffer;
23  import java.util.EnumMap;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.PriorityQueue;
28  import java.util.SortedSet;
29  import java.util.TreeSet;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.Executors;
32  import java.util.concurrent.ScheduledExecutorService;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicLong;
35  import java.util.concurrent.locks.ReentrantLock;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.classification.InterfaceAudience;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.hbase.HColumnDescriptor;
42  import org.apache.hadoop.hbase.io.HeapSize;
43  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
44  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.hbase.util.ClassSize;
47  import org.apache.hadoop.hbase.util.HasThread;
48  import org.apache.hadoop.util.StringUtils;
49  import org.codehaus.jackson.annotate.JsonIgnoreProperties;
50  
51  import com.google.common.annotations.VisibleForTesting;
52  import com.google.common.util.concurrent.ThreadFactoryBuilder;
53  
54  /**
55   * A block cache implementation that is memory-aware using {@link HeapSize},
56   * memory-bound using an LRU eviction algorithm, and concurrent: backed by a
57   * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving
58   * constant-time {@link #cacheBlock} and {@link #getBlock} operations.<p>
59   *
60   * Contains three levels of block priority to allow for
61   * scan-resistance and in-memory families {@link HColumnDescriptor#setInMemory(boolean)} (An
62   * in-memory column family is a column family that should be served from memory if possible):
63   * single-access, multiple-accesses, and in-memory priority.
64   * A block is added with an in-memory priority flag if
65   * {@link HColumnDescriptor#isInMemory()}, otherwise a block becomes a single access
66   * priority the first time it is read into this block cache.  If a block is accessed again while
67   * in cache, it is marked as a multiple access priority block.  This delineation of blocks is used
68   * to prevent scans from thrashing the cache adding a least-frequently-used
69   * element to the eviction algorithm.<p>
70   *
71   * Each priority is given its own chunk of the total cache to ensure
72   * fairness during eviction.  Each priority will retain close to its maximum
73   * size, however, if any priority is not using its entire chunk the others
74   * are able to grow beyond their chunk size.<p>
75   *
76   * Instantiated at a minimum with the total size and average block size.
77   * All sizes are in bytes.  The block size is not especially important as this
78   * cache is fully dynamic in its sizing of blocks.  It is only used for
79   * pre-allocating data structures and in initial heap estimation of the map.<p>
80   *
81   * The detailed constructor defines the sizes for the three priorities (they
82   * should total to the <code>maximum size</code> defined).  It also sets the levels that
83   * trigger and control the eviction thread.<p>
84   *
85   * The <code>acceptable size</code> is the cache size level which triggers the eviction
86   * process to start.  It evicts enough blocks to get the size below the
87   * minimum size specified.<p>
88   *
89   * Eviction happens in a separate thread and involves a single full-scan
90   * of the map.  It determines how many bytes must be freed to reach the minimum
91   * size, and then while scanning determines the fewest least-recently-used
92   * blocks necessary from each of the three priorities (would be 3 times bytes
93   * to free).  It then uses the priority chunk sizes to evict fairly according
94   * to the relative sizes and usage.
95   */
96  @InterfaceAudience.Private
97  @JsonIgnoreProperties({"encodingCountsForTest"})
98  public class LruBlockCache implements ResizableBlockCache, HeapSize {
99  
100   static final Log LOG = LogFactory.getLog(LruBlockCache.class);
101 
102   /**
103    * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep
104    * evicting during an eviction run till the cache size is down to 80% of the total.
105    */
106   static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";
107 
108   /**
109    * Acceptable size of cache (no evictions if size < acceptable)
110    */
111   static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.acceptable.factor";
112 
113   static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.single.percentage";
114   static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.multi.percentage";
115   static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.memory.percentage";
116 
117   /**
118    * Configuration key to force data-block always (except in-memory are too much)
119    * cached in memory for in-memory hfile, unlike inMemory, which is a column-family
120    * configuration, inMemoryForceMode is a cluster-wide configuration
121    */
122   static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME = "hbase.lru.rs.inmemoryforcemode";
123 
124   /** Default Configuration Parameters*/
125 
126   /** Backing Concurrent Map Configuration */
127   static final float DEFAULT_LOAD_FACTOR = 0.75f;
128   static final int DEFAULT_CONCURRENCY_LEVEL = 16;
129 
130   /** Eviction thresholds */
131   static final float DEFAULT_MIN_FACTOR = 0.95f;
132   static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f;
133 
134   /** Priority buckets */
135   static final float DEFAULT_SINGLE_FACTOR = 0.25f;
136   static final float DEFAULT_MULTI_FACTOR = 0.50f;
137   static final float DEFAULT_MEMORY_FACTOR = 0.25f;
138 
139   static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false;
140 
141   /** Statistics thread */
142   static final int statThreadPeriod = 60 * 5;
143 
144   /** Concurrent map (the cache) */
145   private final Map<BlockCacheKey,LruCachedBlock> map;
146 
147   /** Eviction lock (locked when eviction in process) */
148   private final ReentrantLock evictionLock = new ReentrantLock(true);
149 
150   /** Volatile boolean to track if we are in an eviction process or not */
151   private volatile boolean evictionInProgress = false;
152 
153   /** Eviction thread */
154   private final EvictionThread evictionThread;
155 
156   /** Statistics thread schedule pool (for heavy debugging, could remove) */
157   private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
158     new ThreadFactoryBuilder().setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build());
159 
160   /** Current size of cache */
161   private final AtomicLong size;
162 
163   /** Current number of cached elements */
164   private final AtomicLong elements;
165 
166   /** Cache access count (sequential ID) */
167   private final AtomicLong count;
168 
169   /** Cache statistics */
170   private final CacheStats stats;
171 
172   /** Maximum allowable size of cache (block put if size > max, evict) */
173   private long maxSize;
174 
175   /** Approximate block size */
176   private long blockSize;
177 
178   /** Acceptable size of cache (no evictions if size < acceptable) */
179   private float acceptableFactor;
180 
181   /** Minimum threshold of cache (when evicting, evict until size < min) */
182   private float minFactor;
183 
184   /** Single access bucket size */
185   private float singleFactor;
186 
187   /** Multiple access bucket size */
188   private float multiFactor;
189 
190   /** In-memory bucket size */
191   private float memoryFactor;
192 
193   /** Overhead of the structure itself */
194   private long overhead;
195 
196   /** Whether in-memory hfile's data block has higher priority when evicting */
197   private boolean forceInMemory;
198 
199   /** Where to send victims (blocks evicted/missing from the cache) */
200   // TODO: Fix it so this is not explicit reference to a particular BlockCache implementation.
201   private BucketCache victimHandler = null;
202 
203   /**
204    * Default constructor.  Specify maximum size and expected average block
205    * size (approximation is fine).
206    *
207    * <p>All other factors will be calculated based on defaults specified in
208    * this class.
209    * @param maxSize maximum size of cache, in bytes
210    * @param blockSize approximate size of each block, in bytes
211    */
212   public LruBlockCache(long maxSize, long blockSize) {
213     this(maxSize, blockSize, true);
214   }
215 
216   /**
217    * Constructor used for testing.  Allows disabling of the eviction thread.
218    */
219   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
220     this(maxSize, blockSize, evictionThread,
221         (int)Math.ceil(1.2*maxSize/blockSize),
222         DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
223         DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
224         DEFAULT_SINGLE_FACTOR,
225         DEFAULT_MULTI_FACTOR,
226         DEFAULT_MEMORY_FACTOR,
227         false
228         );
229   }
230 
231   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
232     this(maxSize, blockSize, evictionThread,
233         (int)Math.ceil(1.2*maxSize/blockSize),
234         DEFAULT_LOAD_FACTOR,
235         DEFAULT_CONCURRENCY_LEVEL,
236         conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
237         conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR),
238         conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR),
239         conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR),
240         conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR),
241         conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE)
242         );
243   }
244 
245   public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
246     this(maxSize, blockSize, true, conf);
247   }
248 
249   /**
250    * Configurable constructor.  Use this constructor if not using defaults.
251    * @param maxSize maximum size of this cache, in bytes
252    * @param blockSize expected average size of blocks, in bytes
253    * @param evictionThread whether to run evictions in a bg thread or not
254    * @param mapInitialSize initial size of backing ConcurrentHashMap
255    * @param mapLoadFactor initial load factor of backing ConcurrentHashMap
256    * @param mapConcurrencyLevel initial concurrency factor for backing CHM
257    * @param minFactor percentage of total size that eviction will evict until
258    * @param acceptableFactor percentage of total size that triggers eviction
259    * @param singleFactor percentage of total size for single-access blocks
260    * @param multiFactor percentage of total size for multiple-access blocks
261    * @param memoryFactor percentage of total size for in-memory blocks
262    */
263   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
264       int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
265       float minFactor, float acceptableFactor, float singleFactor,
266       float multiFactor, float memoryFactor, boolean forceInMemory) {
267     if(singleFactor + multiFactor + memoryFactor != 1 ||
268         singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) {
269       throw new IllegalArgumentException("Single, multi, and memory factors " +
270           " should be non-negative and total 1.0");
271     }
272     if(minFactor >= acceptableFactor) {
273       throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
274     }
275     if(minFactor >= 1.0f || acceptableFactor >= 1.0f) {
276       throw new IllegalArgumentException("all factors must be < 1");
277     }
278     this.maxSize = maxSize;
279     this.blockSize = blockSize;
280     this.forceInMemory = forceInMemory;
281     map = new ConcurrentHashMap<BlockCacheKey,LruCachedBlock>(mapInitialSize,
282         mapLoadFactor, mapConcurrencyLevel);
283     this.minFactor = minFactor;
284     this.acceptableFactor = acceptableFactor;
285     this.singleFactor = singleFactor;
286     this.multiFactor = multiFactor;
287     this.memoryFactor = memoryFactor;
288     this.stats = new CacheStats(this.getClass().getSimpleName());
289     this.count = new AtomicLong(0);
290     this.elements = new AtomicLong(0);
291     this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
292     this.size = new AtomicLong(this.overhead);
293     if(evictionThread) {
294       this.evictionThread = new EvictionThread(this);
295       this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
296     } else {
297       this.evictionThread = null;
298     }
299     // TODO: Add means of turning this off.  Bit obnoxious running thread just to make a log
300     // every five minutes.
301     this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
302         statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
303   }
304 
305   @Override
306   public void setMaxSize(long maxSize) {
307     this.maxSize = maxSize;
308     if(this.size.get() > acceptableSize() && !evictionInProgress) {
309       runEviction();
310     }
311   }
312 
313   // BlockCache implementation
314 
315   /**
316    * Cache the block with the specified name and buffer.
317    * <p>
318    * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)
319    * this can happen, for which we compare the buffer contents.
320    * @param cacheKey block's cache key
321    * @param buf block buffer
322    * @param inMemory if block is in-memory
323    * @param cacheDataInL1
324    */
325   @Override
326   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
327       final boolean cacheDataInL1) {
328     LruCachedBlock cb = map.get(cacheKey);
329     if (cb != null) {
330       // compare the contents, if they are not equal, we are in big trouble
331       if (compare(buf, cb.getBuffer()) != 0) {
332         throw new RuntimeException("Cached block contents differ, which should not have happened."
333           + "cacheKey:" + cacheKey);
334       }
335       String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey();
336       msg += ". This is harmless and can happen in rare cases (see HBASE-8547)";
337       LOG.warn(msg);
338       return;
339     }
340     cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
341     long newSize = updateSizeMetrics(cb, false);
342     map.put(cacheKey, cb);
343     elements.incrementAndGet();
344     if (newSize > acceptableSize() && !evictionInProgress) {
345       runEviction();
346     }
347   }
348 
349   private int compare(Cacheable left, Cacheable right) {
350     ByteBuffer l = ByteBuffer.allocate(left.getSerializedLength());
351     left.serialize(l);
352     ByteBuffer r = ByteBuffer.allocate(right.getSerializedLength());
353     right.serialize(r);
354     return Bytes.compareTo(l.array(), l.arrayOffset(), l.limit(),
355       r.array(), r.arrayOffset(), r.limit());
356   }
357 
358   /**
359    * Cache the block with the specified name and buffer.
360    * <p>
361    * @param cacheKey block's cache key
362    * @param buf block buffer
363    */
364   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
365     cacheBlock(cacheKey, buf, false, false);
366   }
367 
368   /**
369    * Helper function that updates the local size counter and also updates any
370    * per-cf or per-blocktype metrics it can discern from given
371    * {@link LruCachedBlock}
372    *
373    * @param cb
374    * @param evict
375    */
376   protected long updateSizeMetrics(LruCachedBlock cb, boolean evict) {
377     long heapsize = cb.heapSize();
378     if (evict) {
379       heapsize *= -1;
380     }
381     return size.addAndGet(heapsize);
382   }
383 
384   /**
385    * Get the buffer of the block with the specified name.
386    * @param cacheKey block's cache key
387    * @param caching true if the caller caches blocks on cache misses
388    * @param repeat Whether this is a repeat lookup for the same block
389    *        (used to avoid double counting cache misses when doing double-check locking)
390    * @param updateCacheMetrics Whether to update cache metrics or not
391    * @return buffer of specified cache key, or null if not in cache
392    */
393   @Override
394   public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
395       boolean updateCacheMetrics) {
396     LruCachedBlock cb = map.get(cacheKey);
397     if (cb == null) {
398       if (!repeat && updateCacheMetrics) stats.miss(caching);
399       if (victimHandler != null) {
400         return victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
401       }
402       return null;
403     }
404     if (updateCacheMetrics) stats.hit(caching);
405     cb.access(count.incrementAndGet());
406     return cb.getBuffer();
407   }
408 
409   /**
410    * Whether the cache contains block with specified cacheKey
411    * @param cacheKey
412    * @return true if contains the block
413    */
414   public boolean containsBlock(BlockCacheKey cacheKey) {
415     return map.containsKey(cacheKey);
416   }
417 
418   @Override
419   public boolean evictBlock(BlockCacheKey cacheKey) {
420     LruCachedBlock cb = map.get(cacheKey);
421     if (cb == null) return false;
422     evictBlock(cb, false);
423     return true;
424   }
425 
426   /**
427    * Evicts all blocks for a specific HFile. This is an
428    * expensive operation implemented as a linear-time search through all blocks
429    * in the cache. Ideally this should be a search in a log-access-time map.
430    *
431    * <p>
432    * This is used for evict-on-close to remove all blocks of a specific HFile.
433    *
434    * @return the number of blocks evicted
435    */
436   @Override
437   public int evictBlocksByHfileName(String hfileName) {
438     int numEvicted = 0;
439     for (BlockCacheKey key : map.keySet()) {
440       if (key.getHfileName().equals(hfileName)) {
441         if (evictBlock(key))
442           ++numEvicted;
443       }
444     }
445     if (victimHandler != null) {
446       numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
447     }
448     return numEvicted;
449   }
450 
451   /**
452    * Evict the block, and it will be cached by the victim handler if exists &&
453    * block may be read again later
454    * @param block
455    * @param evictedByEvictionProcess true if the given block is evicted by
456    *          EvictionThread
457    * @return the heap size of evicted block
458    */
459   protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) {
460     map.remove(block.getCacheKey());
461     updateSizeMetrics(block, true);
462     elements.decrementAndGet();
463     stats.evicted(block.getCachedTime());
464     if (evictedByEvictionProcess && victimHandler != null) {
465       boolean wait = getCurrentSize() < acceptableSize();
466       boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
467       victimHandler.cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
468           inMemory, wait);
469     }
470     return block.heapSize();
471   }
472 
473   /**
474    * Multi-threaded call to run the eviction process.
475    */
476   private void runEviction() {
477     if(evictionThread == null) {
478       evict();
479     } else {
480       evictionThread.evict();
481     }
482   }
483 
484   /**
485    * Eviction method.
486    */
487   void evict() {
488 
489     // Ensure only one eviction at a time
490     if(!evictionLock.tryLock()) return;
491 
492     try {
493       evictionInProgress = true;
494       long currentSize = this.size.get();
495       long bytesToFree = currentSize - minSize();
496 
497       if (LOG.isTraceEnabled()) {
498         LOG.trace("Block cache LRU eviction started; Attempting to free " +
499           StringUtils.byteDesc(bytesToFree) + " of total=" +
500           StringUtils.byteDesc(currentSize));
501       }
502 
503       if(bytesToFree <= 0) return;
504 
505       // Instantiate priority buckets
506       BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize, singleSize());
507       BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize, multiSize());
508       BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize, memorySize());
509 
510       // Scan entire map putting into appropriate buckets
511       for(LruCachedBlock cachedBlock : map.values()) {
512         switch(cachedBlock.getPriority()) {
513           case SINGLE: {
514             bucketSingle.add(cachedBlock);
515             break;
516           }
517           case MULTI: {
518             bucketMulti.add(cachedBlock);
519             break;
520           }
521           case MEMORY: {
522             bucketMemory.add(cachedBlock);
523             break;
524           }
525         }
526       }
527 
528       long bytesFreed = 0;
529       if (forceInMemory || memoryFactor > 0.999f) {
530         long s = bucketSingle.totalSize();
531         long m = bucketMulti.totalSize();
532         if (bytesToFree > (s + m)) {
533           // this means we need to evict blocks in memory bucket to make room,
534           // so the single and multi buckets will be emptied
535           bytesFreed = bucketSingle.free(s);
536           bytesFreed += bucketMulti.free(m);
537           bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
538         } else {
539           // this means no need to evict block in memory bucket,
540           // and we try best to make the ratio between single-bucket and
541           // multi-bucket is 1:2
542           long bytesRemain = s + m - bytesToFree;
543           if (3 * s <= bytesRemain) {
544             // single-bucket is small enough that no eviction happens for it
545             // hence all eviction goes from multi-bucket
546             bytesFreed = bucketMulti.free(bytesToFree);
547           } else if (3 * m <= 2 * bytesRemain) {
548             // multi-bucket is small enough that no eviction happens for it
549             // hence all eviction goes from single-bucket
550             bytesFreed = bucketSingle.free(bytesToFree);
551           } else {
552             // both buckets need to evict some blocks
553             bytesFreed = bucketSingle.free(s - bytesRemain / 3);
554             if (bytesFreed < bytesToFree) {
555               bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
556             }
557           }
558         }
559       } else {
560         PriorityQueue<BlockBucket> bucketQueue =
561           new PriorityQueue<BlockBucket>(3);
562 
563         bucketQueue.add(bucketSingle);
564         bucketQueue.add(bucketMulti);
565         bucketQueue.add(bucketMemory);
566 
567         int remainingBuckets = 3;
568 
569         BlockBucket bucket;
570         while((bucket = bucketQueue.poll()) != null) {
571           long overflow = bucket.overflow();
572           if(overflow > 0) {
573             long bucketBytesToFree = Math.min(overflow,
574                 (bytesToFree - bytesFreed) / remainingBuckets);
575             bytesFreed += bucket.free(bucketBytesToFree);
576           }
577           remainingBuckets--;
578         }
579       }
580 
581       if (LOG.isTraceEnabled()) {
582         long single = bucketSingle.totalSize();
583         long multi = bucketMulti.totalSize();
584         long memory = bucketMemory.totalSize();
585         LOG.trace("Block cache LRU eviction completed; " +
586           "freed=" + StringUtils.byteDesc(bytesFreed) + ", " +
587           "total=" + StringUtils.byteDesc(this.size.get()) + ", " +
588           "single=" + StringUtils.byteDesc(single) + ", " +
589           "multi=" + StringUtils.byteDesc(multi) + ", " +
590           "memory=" + StringUtils.byteDesc(memory));
591       }
592     } finally {
593       stats.evict();
594       evictionInProgress = false;
595       evictionLock.unlock();
596     }
597   }
598 
599   /**
600    * Used to group blocks into priority buckets.  There will be a BlockBucket
601    * for each priority (single, multi, memory).  Once bucketed, the eviction
602    * algorithm takes the appropriate number of elements out of each according
603    * to configuration parameters and their relatives sizes.
604    */
605   private class BlockBucket implements Comparable<BlockBucket> {
606     private LruCachedBlockQueue queue;
607     private long totalSize = 0;
608     private long bucketSize;
609 
610     public BlockBucket(long bytesToFree, long blockSize, long bucketSize) {
611       this.bucketSize = bucketSize;
612       queue = new LruCachedBlockQueue(bytesToFree, blockSize);
613       totalSize = 0;
614     }
615 
616     public void add(LruCachedBlock block) {
617       totalSize += block.heapSize();
618       queue.add(block);
619     }
620 
621     public long free(long toFree) {
622       LruCachedBlock cb;
623       long freedBytes = 0;
624       while ((cb = queue.pollLast()) != null) {
625         freedBytes += evictBlock(cb, true);
626         if (freedBytes >= toFree) {
627           return freedBytes;
628         }
629       }
630       return freedBytes;
631     }
632 
633     public long overflow() {
634       return totalSize - bucketSize;
635     }
636 
637     public long totalSize() {
638       return totalSize;
639     }
640 
641     public int compareTo(BlockBucket that) {
642       if(this.overflow() == that.overflow()) return 0;
643       return this.overflow() > that.overflow() ? 1 : -1;
644     }
645 
646     @Override
647     public boolean equals(Object that) {
648       if (that == null || !(that instanceof BlockBucket)){
649         return false;
650       }
651       return compareTo((BlockBucket)that) == 0;
652     }
653 
654     @Override
655     public int hashCode() {
656       // Nothing distingushing about each instance unless I pass in a 'name' or something
657       return super.hashCode();
658     }
659   }
660 
661   /**
662    * Get the maximum size of this cache.
663    * @return max size in bytes
664    */
665   public long getMaxSize() {
666     return this.maxSize;
667   }
668 
669   @Override
670   public long getCurrentSize() {
671     return this.size.get();
672   }
673 
674   @Override
675   public long getFreeSize() {
676     return getMaxSize() - getCurrentSize();
677   }
678 
679   @Override
680   public long size() {
681     return getMaxSize();
682   }
683 
684   @Override
685   public long getBlockCount() {
686     return this.elements.get();
687   }
688 
689   EvictionThread getEvictionThread() {
690     return this.evictionThread;
691   }
692 
693   /*
694    * Eviction thread.  Sits in waiting state until an eviction is triggered
695    * when the cache size grows above the acceptable level.<p>
696    *
697    * Thread is triggered into action by {@link LruBlockCache#runEviction()}
698    */
699   static class EvictionThread extends HasThread {
700     private WeakReference<LruBlockCache> cache;
701     private volatile boolean go = true;
702     // flag set after enter the run method, used for test
703     private boolean enteringRun = false;
704 
705     public EvictionThread(LruBlockCache cache) {
706       super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread");
707       setDaemon(true);
708       this.cache = new WeakReference<LruBlockCache>(cache);
709     }
710 
711     @Override
712     public void run() {
713       enteringRun = true;
714       while (this.go) {
715         synchronized(this) {
716           try {
717             this.wait(1000 * 10/*Don't wait for ever*/);
718           } catch(InterruptedException e) {}
719         }
720         LruBlockCache cache = this.cache.get();
721         if (cache == null) break;
722         cache.evict();
723       }
724     }
725 
726     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
727         justification="This is what we want")
728     public void evict() {
729       synchronized(this) {
730         this.notifyAll();
731       }
732     }
733 
734     synchronized void shutdown() {
735       this.go = false;
736       this.notifyAll();
737     }
738 
739     /**
740      * Used for the test.
741      */
742     boolean isEnteringRun() {
743       return this.enteringRun;
744     }
745   }
746 
747   /*
748    * Statistics thread.  Periodically prints the cache statistics to the log.
749    */
750   static class StatisticsThread extends Thread {
751     private final LruBlockCache lru;
752 
753     public StatisticsThread(LruBlockCache lru) {
754       super("LruBlockCacheStats");
755       setDaemon(true);
756       this.lru = lru;
757     }
758 
759     @Override
760     public void run() {
761       lru.logStats();
762     }
763   }
764 
765   public void logStats() {
766     // Log size
767     long totalSize = heapSize();
768     long freeSize = maxSize - totalSize;
769     LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
770         "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
771         "max=" + StringUtils.byteDesc(this.maxSize) + ", " +
772         "accesses=" + stats.getRequestCount() + ", " +
773         "hits=" + stats.getHitCount() + ", " +
774         "hitRatio=" + (stats.getHitCount() == 0 ?
775           "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
776         "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
777         "cachingHits=" + stats.getHitCachingCount() + ", " +
778         "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ?
779           "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
780         "evictions=" + stats.getEvictionCount() + ", " +
781         "evicted=" + stats.getEvictedCount() + ", " +
782         "evictedPerRun=" + stats.evictedPerEviction());
783   }
784 
785   /**
786    * Get counter statistics for this cache.
787    *
788    * <p>Includes: total accesses, hits, misses, evicted blocks, and runs
789    * of the eviction processes.
790    */
791   public CacheStats getStats() {
792     return this.stats;
793   }
794 
795   public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
796       (3 * Bytes.SIZEOF_LONG) + (9 * ClassSize.REFERENCE) +
797       (5 * Bytes.SIZEOF_FLOAT) + Bytes.SIZEOF_BOOLEAN
798       + ClassSize.OBJECT);
799 
800   @Override
801   public long heapSize() {
802     return getCurrentSize();
803   }
804 
805   public static long calculateOverhead(long maxSize, long blockSize, int concurrency){
806     // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG
807     return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP +
808         ((long)Math.ceil(maxSize*1.2/blockSize)
809             * ClassSize.CONCURRENT_HASHMAP_ENTRY) +
810         ((long)concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
811   }
812 
813   @Override
814   public Iterator<CachedBlock> iterator() {
815     final Iterator<LruCachedBlock> iterator = map.values().iterator();
816 
817     return new Iterator<CachedBlock>() {
818       private final long now = System.nanoTime();
819 
820       @Override
821       public boolean hasNext() {
822         return iterator.hasNext();
823       }
824 
825       @Override
826       public CachedBlock next() {
827         final LruCachedBlock b = iterator.next();
828         return new CachedBlock() {
829           @Override
830           public String toString() {
831             return BlockCacheUtil.toString(this, now);
832           }
833 
834           @Override
835           public BlockPriority getBlockPriority() {
836             return b.getPriority();
837           }
838 
839           @Override
840           public BlockType getBlockType() {
841             return b.getBuffer().getBlockType();
842           }
843 
844           @Override
845           public long getOffset() {
846             return b.getCacheKey().getOffset();
847           }
848 
849           @Override
850           public long getSize() {
851             return b.getBuffer().heapSize();
852           }
853 
854           @Override
855           public long getCachedTime() {
856             return b.getCachedTime();
857           }
858 
859           @Override
860           public String getFilename() {
861             return b.getCacheKey().getHfileName();
862           }
863 
864           @Override
865           public int compareTo(CachedBlock other) {
866             int diff = this.getFilename().compareTo(other.getFilename());
867             if (diff != 0) return diff;
868             diff = (int)(this.getOffset() - other.getOffset());
869             if (diff != 0) return diff;
870             if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
871               throw new IllegalStateException("" + this.getCachedTime() + ", " +
872                 other.getCachedTime());
873             }
874             return (int)(other.getCachedTime() - this.getCachedTime());
875           }
876 
877           @Override
878           public int hashCode() {
879             return b.hashCode();
880           }
881 
882           @Override
883           public boolean equals(Object obj) {
884             if (obj instanceof CachedBlock) {
885               CachedBlock cb = (CachedBlock)obj;
886               return compareTo(cb) == 0;
887             } else {
888               return false;
889             }
890           }
891         };
892       }
893 
894       @Override
895       public void remove() {
896         throw new UnsupportedOperationException();
897       }
898     };
899   }
900 
901   // Simple calculators of sizes given factors and maxSize
902 
903   long acceptableSize() {
904     return (long)Math.floor(this.maxSize * this.acceptableFactor);
905   }
906   private long minSize() {
907     return (long)Math.floor(this.maxSize * this.minFactor);
908   }
909   private long singleSize() {
910     return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor);
911   }
912   private long multiSize() {
913     return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor);
914   }
915   private long memorySize() {
916     return (long)Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
917   }
918 
919   public void shutdown() {
920     if (victimHandler != null)
921       victimHandler.shutdown();
922     this.scheduleThreadPool.shutdown();
923     for (int i = 0; i < 10; i++) {
924       if (!this.scheduleThreadPool.isShutdown()) {
925         try {
926           Thread.sleep(10);
927         } catch (InterruptedException e) {
928           LOG.warn("Interrupted while sleeping");
929           Thread.currentThread().interrupt();
930           break;
931         }
932       }
933     }
934 
935     if (!this.scheduleThreadPool.isShutdown()) {
936       List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
937       LOG.debug("Still running " + runnables);
938     }
939     this.evictionThread.shutdown();
940   }
941 
942   /** Clears the cache. Used in tests. */
943   public void clearCache() {
944     this.map.clear();
945     this.elements.set(0);
946   }
947 
948   /**
949    * Used in testing. May be very inefficient.
950    * @return the set of cached file names
951    */
952   SortedSet<String> getCachedFileNamesForTest() {
953     SortedSet<String> fileNames = new TreeSet<String>();
954     for (BlockCacheKey cacheKey : map.keySet()) {
955       fileNames.add(cacheKey.getHfileName());
956     }
957     return fileNames;
958   }
959 
960   @VisibleForTesting
961   Map<BlockType, Integer> getBlockTypeCountsForTest() {
962     Map<BlockType, Integer> counts =
963         new EnumMap<BlockType, Integer>(BlockType.class);
964     for (LruCachedBlock cb : map.values()) {
965       BlockType blockType = ((Cacheable)cb.getBuffer()).getBlockType();
966       Integer count = counts.get(blockType);
967       counts.put(blockType, (count == null ? 0 : count) + 1);
968     }
969     return counts;
970   }
971 
972   public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
973     Map<DataBlockEncoding, Integer> counts =
974         new EnumMap<DataBlockEncoding, Integer>(DataBlockEncoding.class);
975     for (LruCachedBlock block : map.values()) {
976       DataBlockEncoding encoding =
977               ((HFileBlock) block.getBuffer()).getDataBlockEncoding();
978       Integer count = counts.get(encoding);
979       counts.put(encoding, (count == null ? 0 : count) + 1);
980     }
981     return counts;
982   }
983 
984   public void setVictimCache(BucketCache handler) {
985     assert victimHandler == null;
986     victimHandler = handler;
987   }
988 
989   BucketCache getVictimHandler() {
990     return this.victimHandler;
991   }
992 
993   @Override
994   public BlockCache[] getBlockCaches() {
995     return null;
996   }
997 }