View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.lang.ref.WeakReference;
22  import java.nio.ByteBuffer;
23  import java.util.EnumMap;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.PriorityQueue;
28  import java.util.SortedSet;
29  import java.util.TreeSet;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.Executors;
32  import java.util.concurrent.ScheduledExecutorService;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicLong;
35  import java.util.concurrent.locks.ReentrantLock;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.io.HeapSize;
42  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
43  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.hadoop.hbase.util.ClassSize;
46  import org.apache.hadoop.hbase.util.HasThread;
47  import org.apache.hadoop.util.StringUtils;
48  import org.codehaus.jackson.annotate.JsonIgnoreProperties;
49  
50  import com.google.common.annotations.VisibleForTesting;
51  import com.google.common.base.Objects;
52  import com.google.common.util.concurrent.ThreadFactoryBuilder;
53  
54  /**
55   * A block cache implementation that is memory-aware using {@link HeapSize},
56   * memory-bound using an LRU eviction algorithm, and concurrent: backed by a
57   * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving
58   * constant-time {@link #cacheBlock} and {@link #getBlock} operations.<p>
59   *
60   * Contains three levels of block priority to allow for scan-resistance and in-memory families 
61   * {@link org.apache.hadoop.hbase.HColumnDescriptor#setInMemory(boolean)} (An in-memory column 
62   * family is a column family that should be served from memory if possible):
63   * single-access, multiple-accesses, and in-memory priority.
64   * A block is added with an in-memory priority flag if
65   * {@link org.apache.hadoop.hbase.HColumnDescriptor#isInMemory()}, otherwise a block becomes a
66   *  single access priority the first time it is read into this block cache.  If a block is
67   *  accessed again while in cache, it is marked as a multiple access priority block.  This
68   *  delineation of blocks is used to prevent scans from thrashing the cache adding a 
69   *  least-frequently-used element to the eviction algorithm.<p>
70   *
71   * Each priority is given its own chunk of the total cache to ensure
72   * fairness during eviction.  Each priority will retain close to its maximum
73   * size, however, if any priority is not using its entire chunk the others
74   * are able to grow beyond their chunk size.<p>
75   *
76   * Instantiated at a minimum with the total size and average block size.
77   * All sizes are in bytes.  The block size is not especially important as this
78   * cache is fully dynamic in its sizing of blocks.  It is only used for
79   * pre-allocating data structures and in initial heap estimation of the map.<p>
80   *
81   * The detailed constructor defines the sizes for the three priorities (they
82   * should total to the <code>maximum size</code> defined).  It also sets the levels that
83   * trigger and control the eviction thread.<p>
84   *
85   * The <code>acceptable size</code> is the cache size level which triggers the eviction
86   * process to start.  It evicts enough blocks to get the size below the
87   * minimum size specified.<p>
88   *
89   * Eviction happens in a separate thread and involves a single full-scan
90   * of the map.  It determines how many bytes must be freed to reach the minimum
91   * size, and then while scanning determines the fewest least-recently-used
92   * blocks necessary from each of the three priorities (would be 3 times bytes
93   * to free).  It then uses the priority chunk sizes to evict fairly according
94   * to the relative sizes and usage.
95   */
96  @InterfaceAudience.Private
97  @JsonIgnoreProperties({"encodingCountsForTest"})
98  public class LruBlockCache implements ResizableBlockCache, HeapSize {
99  
100   private static final Log LOG = LogFactory.getLog(LruBlockCache.class);
101 
102   /**
103    * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep
104    * evicting during an eviction run till the cache size is down to 80% of the total.
105    */
106   static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";
107 
108   /**
109    * Acceptable size of cache (no evictions if size < acceptable)
110    */
111   static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.acceptable.factor";
112 
113   static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.single.percentage";
114   static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.multi.percentage";
115   static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.memory.percentage";
116 
117   /**
118    * Configuration key to force data-block always (except in-memory are too much)
119    * cached in memory for in-memory hfile, unlike inMemory, which is a column-family
120    * configuration, inMemoryForceMode is a cluster-wide configuration
121    */
122   static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME = "hbase.lru.rs.inmemoryforcemode";
123 
124   /** Default Configuration Parameters*/
125 
126   /** Backing Concurrent Map Configuration */
127   static final float DEFAULT_LOAD_FACTOR = 0.75f;
128   static final int DEFAULT_CONCURRENCY_LEVEL = 16;
129 
130   /** Eviction thresholds */
131   static final float DEFAULT_MIN_FACTOR = 0.95f;
132   static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f;
133 
134   /** Priority buckets */
135   static final float DEFAULT_SINGLE_FACTOR = 0.25f;
136   static final float DEFAULT_MULTI_FACTOR = 0.50f;
137   static final float DEFAULT_MEMORY_FACTOR = 0.25f;
138 
139   static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false;
140 
141   /** Statistics thread */
142   static final int statThreadPeriod = 60 * 5;
143   private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size";
144   private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
145 
146   /** Concurrent map (the cache) */
147   private final Map<BlockCacheKey,LruCachedBlock> map;
148 
149   /** Eviction lock (locked when eviction in process) */
150   private final ReentrantLock evictionLock = new ReentrantLock(true);
151   private final long maxBlockSize;
152 
153   /** Volatile boolean to track if we are in an eviction process or not */
154   private volatile boolean evictionInProgress = false;
155 
156   /** Eviction thread */
157   private final EvictionThread evictionThread;
158 
159   /** Statistics thread schedule pool (for heavy debugging, could remove) */
160   private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
161     new ThreadFactoryBuilder().setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build());
162 
163   /** Current size of cache */
164   private final AtomicLong size;
165 
166   /** Current number of cached elements */
167   private final AtomicLong elements;
168 
169   /** Cache access count (sequential ID) */
170   private final AtomicLong count;
171 
172   /** Cache statistics */
173   private final CacheStats stats;
174 
175   /** Maximum allowable size of cache (block put if size > max, evict) */
176   private long maxSize;
177 
178   /** Approximate block size */
179   private long blockSize;
180 
181   /** Acceptable size of cache (no evictions if size < acceptable) */
182   private float acceptableFactor;
183 
184   /** Minimum threshold of cache (when evicting, evict until size < min) */
185   private float minFactor;
186 
187   /** Single access bucket size */
188   private float singleFactor;
189 
190   /** Multiple access bucket size */
191   private float multiFactor;
192 
193   /** In-memory bucket size */
194   private float memoryFactor;
195 
196   /** Overhead of the structure itself */
197   private long overhead;
198 
199   /** Whether in-memory hfile's data block has higher priority when evicting */
200   private boolean forceInMemory;
201 
202   /** Where to send victims (blocks evicted/missing from the cache) */
203   private BlockCache victimHandler = null;
204 
205   /**
206    * Default constructor.  Specify maximum size and expected average block
207    * size (approximation is fine).
208    *
209    * <p>All other factors will be calculated based on defaults specified in
210    * this class.
211    * @param maxSize maximum size of cache, in bytes
212    * @param blockSize approximate size of each block, in bytes
213    */
214   public LruBlockCache(long maxSize, long blockSize) {
215     this(maxSize, blockSize, true);
216   }
217 
218   /**
219    * Constructor used for testing.  Allows disabling of the eviction thread.
220    */
221   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
222     this(maxSize, blockSize, evictionThread,
223         (int)Math.ceil(1.2*maxSize/blockSize),
224         DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
225         DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
226         DEFAULT_SINGLE_FACTOR,
227         DEFAULT_MULTI_FACTOR,
228         DEFAULT_MEMORY_FACTOR,
229         false,
230         DEFAULT_MAX_BLOCK_SIZE
231         );
232   }
233 
234   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
235     this(maxSize, blockSize, evictionThread,
236         (int)Math.ceil(1.2*maxSize/blockSize),
237         DEFAULT_LOAD_FACTOR,
238         DEFAULT_CONCURRENCY_LEVEL,
239         conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
240         conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR),
241         conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR),
242         conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR),
243         conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR),
244         conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE),
245         conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE)
246         );
247   }
248 
249   public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
250     this(maxSize, blockSize, true, conf);
251   }
252 
253   /**
254    * Configurable constructor.  Use this constructor if not using defaults.
255    * @param maxSize maximum size of this cache, in bytes
256    * @param blockSize expected average size of blocks, in bytes
257    * @param evictionThread whether to run evictions in a bg thread or not
258    * @param mapInitialSize initial size of backing ConcurrentHashMap
259    * @param mapLoadFactor initial load factor of backing ConcurrentHashMap
260    * @param mapConcurrencyLevel initial concurrency factor for backing CHM
261    * @param minFactor percentage of total size that eviction will evict until
262    * @param acceptableFactor percentage of total size that triggers eviction
263    * @param singleFactor percentage of total size for single-access blocks
264    * @param multiFactor percentage of total size for multiple-access blocks
265    * @param memoryFactor percentage of total size for in-memory blocks
266    */
267   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
268       int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
269       float minFactor, float acceptableFactor, float singleFactor,
270       float multiFactor, float memoryFactor, boolean forceInMemory, long maxBlockSize) {
271     this.maxBlockSize = maxBlockSize;
272     if(singleFactor + multiFactor + memoryFactor != 1 ||
273         singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) {
274       throw new IllegalArgumentException("Single, multi, and memory factors " +
275           " should be non-negative and total 1.0");
276     }
277     if(minFactor >= acceptableFactor) {
278       throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
279     }
280     if(minFactor >= 1.0f || acceptableFactor >= 1.0f) {
281       throw new IllegalArgumentException("all factors must be < 1");
282     }
283     this.maxSize = maxSize;
284     this.blockSize = blockSize;
285     this.forceInMemory = forceInMemory;
286     map = new ConcurrentHashMap<BlockCacheKey,LruCachedBlock>(mapInitialSize,
287         mapLoadFactor, mapConcurrencyLevel);
288     this.minFactor = minFactor;
289     this.acceptableFactor = acceptableFactor;
290     this.singleFactor = singleFactor;
291     this.multiFactor = multiFactor;
292     this.memoryFactor = memoryFactor;
293     this.stats = new CacheStats(this.getClass().getSimpleName());
294     this.count = new AtomicLong(0);
295     this.elements = new AtomicLong(0);
296     this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
297     this.size = new AtomicLong(this.overhead);
298     if(evictionThread) {
299       this.evictionThread = new EvictionThread(this);
300       this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
301     } else {
302       this.evictionThread = null;
303     }
304     // TODO: Add means of turning this off.  Bit obnoxious running thread just to make a log
305     // every five minutes.
306     this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
307         statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
308   }
309 
310   @Override
311   public void setMaxSize(long maxSize) {
312     this.maxSize = maxSize;
313     if(this.size.get() > acceptableSize() && !evictionInProgress) {
314       runEviction();
315     }
316   }
317 
318   // BlockCache implementation
319 
320   /**
321    * Cache the block with the specified name and buffer.
322    * <p>
323    * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)
324    * this can happen, for which we compare the buffer contents.
325    * @param cacheKey block's cache key
326    * @param buf block buffer
327    * @param inMemory if block is in-memory
328    * @param cacheDataInL1
329    */
330   @Override
331   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
332       final boolean cacheDataInL1) {
333 
334     if (buf.heapSize() > maxBlockSize) {
335       // If there are a lot of blocks that are too
336       // big this can make the logs way too noisy.
337       // So we log 2%
338       if (stats.failInsert() % 50 == 0) {
339         LOG.warn("Trying to cache too large a block "
340             + cacheKey.getHfileName() + " @ "
341             + cacheKey.getOffset()
342             + " is " + buf.heapSize()
343             + " which is larger than " + maxBlockSize);
344       }
345       return;
346     }
347 
348     LruCachedBlock cb = map.get(cacheKey);
349     if (cb != null) {
350       // compare the contents, if they are not equal, we are in big trouble
351       if (compare(buf, cb.getBuffer()) != 0) {
352         throw new RuntimeException("Cached block contents differ, which should not have happened."
353           + "cacheKey:" + cacheKey);
354       }
355       String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey();
356       msg += ". This is harmless and can happen in rare cases (see HBASE-8547)";
357       LOG.warn(msg);
358       return;
359     }
360     cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
361     long newSize = updateSizeMetrics(cb, false);
362     map.put(cacheKey, cb);
363     long val = elements.incrementAndGet();
364     if (LOG.isTraceEnabled()) {
365       long size = map.size();
366       assertCounterSanity(size, val);
367     }
368     if (newSize > acceptableSize() && !evictionInProgress) {
369       runEviction();
370     }
371   }
372 
373   /**
374    * Sanity-checking for parity between actual block cache content and metrics.
375    * Intended only for use with TRACE level logging and -ea JVM.
376    */
377   private static void assertCounterSanity(long mapSize, long counterVal) {
378     if (counterVal < 0) {
379       LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal +
380         ", mapSize=" + mapSize);
381       return;
382     }
383     if (mapSize < Integer.MAX_VALUE) {
384       double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.);
385       if (pct_diff > 0.05) {
386         LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal +
387           ", mapSize=" + mapSize);
388       }
389     }
390   }
391 
392   private int compare(Cacheable left, Cacheable right) {
393     ByteBuffer l = ByteBuffer.allocate(left.getSerializedLength());
394     left.serialize(l);
395     ByteBuffer r = ByteBuffer.allocate(right.getSerializedLength());
396     right.serialize(r);
397     return Bytes.compareTo(l.array(), l.arrayOffset(), l.limit(),
398       r.array(), r.arrayOffset(), r.limit());
399   }
400 
401   /**
402    * Cache the block with the specified name and buffer.
403    * <p>
404    * @param cacheKey block's cache key
405    * @param buf block buffer
406    */
407   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
408     cacheBlock(cacheKey, buf, false, false);
409   }
410 
411   /**
412    * Helper function that updates the local size counter and also updates any
413    * per-cf or per-blocktype metrics it can discern from given
414    * {@link LruCachedBlock}
415    *
416    * @param cb
417    * @param evict
418    */
419   protected long updateSizeMetrics(LruCachedBlock cb, boolean evict) {
420     long heapsize = cb.heapSize();
421     if (evict) {
422       heapsize *= -1;
423     }
424     return size.addAndGet(heapsize);
425   }
426 
427   /**
428    * Get the buffer of the block with the specified name.
429    * @param cacheKey block's cache key
430    * @param caching true if the caller caches blocks on cache misses
431    * @param repeat Whether this is a repeat lookup for the same block
432    *        (used to avoid double counting cache misses when doing double-check locking)
433    * @param updateCacheMetrics Whether to update cache metrics or not
434    * @return buffer of specified cache key, or null if not in cache
435    */
436   @Override
437   public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
438       boolean updateCacheMetrics) {
439     LruCachedBlock cb = map.get(cacheKey);
440     if (cb == null) {
441       if (!repeat && updateCacheMetrics) {
442         stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
443       }
444       // If there is another block cache then try and read there.
445       // However if this is a retry ( second time in double checked locking )
446       // And it's already a miss then the l2 will also be a miss.
447       if (victimHandler != null && !repeat) {
448         Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
449 
450         // Promote this to L1.
451         if (result != null && caching) {
452           cacheBlock(cacheKey, result, /* inMemory = */ false, /* cacheData = */ true);
453         }
454         return result;
455       }
456       return null;
457     }
458     if (updateCacheMetrics) stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
459     cb.access(count.incrementAndGet());
460     return cb.getBuffer();
461   }
462 
463   /**
464    * Whether the cache contains block with specified cacheKey
465    * @param cacheKey
466    * @return true if contains the block
467    */
468   public boolean containsBlock(BlockCacheKey cacheKey) {
469     return map.containsKey(cacheKey);
470   }
471 
472   @Override
473   public boolean evictBlock(BlockCacheKey cacheKey) {
474     LruCachedBlock cb = map.get(cacheKey);
475     if (cb == null) return false;
476     evictBlock(cb, false);
477     return true;
478   }
479 
480   /**
481    * Evicts all blocks for a specific HFile. This is an
482    * expensive operation implemented as a linear-time search through all blocks
483    * in the cache. Ideally this should be a search in a log-access-time map.
484    *
485    * <p>
486    * This is used for evict-on-close to remove all blocks of a specific HFile.
487    *
488    * @return the number of blocks evicted
489    */
490   @Override
491   public int evictBlocksByHfileName(String hfileName) {
492     int numEvicted = 0;
493     for (BlockCacheKey key : map.keySet()) {
494       if (key.getHfileName().equals(hfileName)) {
495         if (evictBlock(key))
496           ++numEvicted;
497       }
498     }
499     if (victimHandler != null) {
500       numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
501     }
502     return numEvicted;
503   }
504 
505   /**
506    * Evict the block, and it will be cached by the victim handler if exists &amp;&amp;
507    * block may be read again later
508    * @param block
509    * @param evictedByEvictionProcess true if the given block is evicted by
510    *          EvictionThread
511    * @return the heap size of evicted block
512    */
513   protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) {
514     map.remove(block.getCacheKey());
515     updateSizeMetrics(block, true);
516     long val = elements.decrementAndGet();
517     if (LOG.isTraceEnabled()) {
518       long size = map.size();
519       assertCounterSanity(size, val);
520     }
521     stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary());
522     if (evictedByEvictionProcess && victimHandler != null) {
523       if (victimHandler instanceof BucketCache) {
524         boolean wait = getCurrentSize() < acceptableSize();
525         boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
526         ((BucketCache)victimHandler).cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
527             inMemory, wait);
528       } else {
529         victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer());
530       }
531     }
532     return block.heapSize();
533   }
534 
535   /**
536    * Multi-threaded call to run the eviction process.
537    */
538   private void runEviction() {
539     if(evictionThread == null) {
540       evict();
541     } else {
542       evictionThread.evict();
543     }
544   }
545 
546   /**
547    * Eviction method.
548    */
549   void evict() {
550 
551     // Ensure only one eviction at a time
552     if(!evictionLock.tryLock()) return;
553 
554     try {
555       evictionInProgress = true;
556       long currentSize = this.size.get();
557       long bytesToFree = currentSize - minSize();
558 
559       if (LOG.isTraceEnabled()) {
560         LOG.trace("Block cache LRU eviction started; Attempting to free " +
561           StringUtils.byteDesc(bytesToFree) + " of total=" +
562           StringUtils.byteDesc(currentSize));
563       }
564 
565       if(bytesToFree <= 0) return;
566 
567       // Instantiate priority buckets
568       BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize,
569           singleSize());
570       BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize,
571           multiSize());
572       BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize,
573           memorySize());
574 
575       // Scan entire map putting into appropriate buckets
576       for(LruCachedBlock cachedBlock : map.values()) {
577         switch(cachedBlock.getPriority()) {
578           case SINGLE: {
579             bucketSingle.add(cachedBlock);
580             break;
581           }
582           case MULTI: {
583             bucketMulti.add(cachedBlock);
584             break;
585           }
586           case MEMORY: {
587             bucketMemory.add(cachedBlock);
588             break;
589           }
590         }
591       }
592 
593       long bytesFreed = 0;
594       if (forceInMemory || memoryFactor > 0.999f) {
595         long s = bucketSingle.totalSize();
596         long m = bucketMulti.totalSize();
597         if (bytesToFree > (s + m)) {
598           // this means we need to evict blocks in memory bucket to make room,
599           // so the single and multi buckets will be emptied
600           bytesFreed = bucketSingle.free(s);
601           bytesFreed += bucketMulti.free(m);
602           if (LOG.isTraceEnabled()) {
603             LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
604               " from single and multi buckets");
605           }
606           bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
607           if (LOG.isTraceEnabled()) {
608             LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
609               " total from all three buckets ");
610           }
611         } else {
612           // this means no need to evict block in memory bucket,
613           // and we try best to make the ratio between single-bucket and
614           // multi-bucket is 1:2
615           long bytesRemain = s + m - bytesToFree;
616           if (3 * s <= bytesRemain) {
617             // single-bucket is small enough that no eviction happens for it
618             // hence all eviction goes from multi-bucket
619             bytesFreed = bucketMulti.free(bytesToFree);
620           } else if (3 * m <= 2 * bytesRemain) {
621             // multi-bucket is small enough that no eviction happens for it
622             // hence all eviction goes from single-bucket
623             bytesFreed = bucketSingle.free(bytesToFree);
624           } else {
625             // both buckets need to evict some blocks
626             bytesFreed = bucketSingle.free(s - bytesRemain / 3);
627             if (bytesFreed < bytesToFree) {
628               bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
629             }
630           }
631         }
632       } else {
633         PriorityQueue<BlockBucket> bucketQueue =
634           new PriorityQueue<BlockBucket>(3);
635 
636         bucketQueue.add(bucketSingle);
637         bucketQueue.add(bucketMulti);
638         bucketQueue.add(bucketMemory);
639 
640         int remainingBuckets = 3;
641 
642         BlockBucket bucket;
643         while((bucket = bucketQueue.poll()) != null) {
644           long overflow = bucket.overflow();
645           if(overflow > 0) {
646             long bucketBytesToFree = Math.min(overflow,
647                 (bytesToFree - bytesFreed) / remainingBuckets);
648             bytesFreed += bucket.free(bucketBytesToFree);
649           }
650           remainingBuckets--;
651         }
652       }
653 
654       if (LOG.isTraceEnabled()) {
655         long single = bucketSingle.totalSize();
656         long multi = bucketMulti.totalSize();
657         long memory = bucketMemory.totalSize();
658         LOG.trace("Block cache LRU eviction completed; " +
659           "freed=" + StringUtils.byteDesc(bytesFreed) + ", " +
660           "total=" + StringUtils.byteDesc(this.size.get()) + ", " +
661           "single=" + StringUtils.byteDesc(single) + ", " +
662           "multi=" + StringUtils.byteDesc(multi) + ", " +
663           "memory=" + StringUtils.byteDesc(memory));
664       }
665     } finally {
666       stats.evict();
667       evictionInProgress = false;
668       evictionLock.unlock();
669     }
670   }
671 
672   @Override
673   public String toString() {
674     return Objects.toStringHelper(this)
675       .add("blockCount", getBlockCount())
676       .add("currentSize", getCurrentSize())
677       .add("freeSize", getFreeSize())
678       .add("maxSize", getMaxSize())
679       .add("heapSize", heapSize())
680       .add("minSize", minSize())
681       .add("minFactor", minFactor)
682       .add("multiSize", multiSize())
683       .add("multiFactor", multiFactor)
684       .add("singleSize", singleSize())
685       .add("singleFactor", singleFactor)
686       .toString();
687   }
688 
689   /**
690    * Used to group blocks into priority buckets.  There will be a BlockBucket
691    * for each priority (single, multi, memory).  Once bucketed, the eviction
692    * algorithm takes the appropriate number of elements out of each according
693    * to configuration parameters and their relatives sizes.
694    */
695   private class BlockBucket implements Comparable<BlockBucket> {
696 
697     private final String name;
698     private LruCachedBlockQueue queue;
699     private long totalSize = 0;
700     private long bucketSize;
701 
702     public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) {
703       this.name = name;
704       this.bucketSize = bucketSize;
705       queue = new LruCachedBlockQueue(bytesToFree, blockSize);
706       totalSize = 0;
707     }
708 
709     public void add(LruCachedBlock block) {
710       totalSize += block.heapSize();
711       queue.add(block);
712     }
713 
714     public long free(long toFree) {
715       if (LOG.isTraceEnabled()) {
716         LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this);
717       }
718       LruCachedBlock cb;
719       long freedBytes = 0;
720       while ((cb = queue.pollLast()) != null) {
721         freedBytes += evictBlock(cb, true);
722         if (freedBytes >= toFree) {
723           return freedBytes;
724         }
725       }
726       if (LOG.isTraceEnabled()) {
727         LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this);
728       }
729       return freedBytes;
730     }
731 
732     public long overflow() {
733       return totalSize - bucketSize;
734     }
735 
736     public long totalSize() {
737       return totalSize;
738     }
739 
740     public int compareTo(BlockBucket that) {
741       if(this.overflow() == that.overflow()) return 0;
742       return this.overflow() > that.overflow() ? 1 : -1;
743     }
744 
745     @Override
746     public boolean equals(Object that) {
747       if (that == null || !(that instanceof BlockBucket)){
748         return false;
749       }
750       return compareTo((BlockBucket)that) == 0;
751     }
752 
753     @Override
754     public int hashCode() {
755       return Objects.hashCode(name, bucketSize, queue, totalSize);
756     }
757 
758     @Override
759     public String toString() {
760       return Objects.toStringHelper(this)
761         .add("name", name)
762         .add("totalSize", StringUtils.byteDesc(totalSize))
763         .add("bucketSize", StringUtils.byteDesc(bucketSize))
764         .toString();
765     }
766   }
767 
768   /**
769    * Get the maximum size of this cache.
770    * @return max size in bytes
771    */
772   public long getMaxSize() {
773     return this.maxSize;
774   }
775 
776   @Override
777   public long getCurrentSize() {
778     return this.size.get();
779   }
780 
781   @Override
782   public long getFreeSize() {
783     return getMaxSize() - getCurrentSize();
784   }
785 
786   @Override
787   public long size() {
788     return getMaxSize();
789   }
790 
791   @Override
792   public long getBlockCount() {
793     return this.elements.get();
794   }
795 
796   EvictionThread getEvictionThread() {
797     return this.evictionThread;
798   }
799 
800   /*
801    * Eviction thread.  Sits in waiting state until an eviction is triggered
802    * when the cache size grows above the acceptable level.<p>
803    *
804    * Thread is triggered into action by {@link LruBlockCache#runEviction()}
805    */
806   static class EvictionThread extends HasThread {
807     private WeakReference<LruBlockCache> cache;
808     private volatile boolean go = true;
809     // flag set after enter the run method, used for test
810     private boolean enteringRun = false;
811 
812     public EvictionThread(LruBlockCache cache) {
813       super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread");
814       setDaemon(true);
815       this.cache = new WeakReference<LruBlockCache>(cache);
816     }
817 
818     @Override
819     public void run() {
820       enteringRun = true;
821       while (this.go) {
822         synchronized(this) {
823           try {
824             this.wait(1000 * 10/*Don't wait for ever*/);
825           } catch(InterruptedException e) {
826             LOG.warn("Interrupted eviction thread ", e);
827             Thread.currentThread().interrupt();
828           }
829         }
830         LruBlockCache cache = this.cache.get();
831         if (cache == null) break;
832         cache.evict();
833       }
834     }
835 
836     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
837         justification="This is what we want")
838     public void evict() {
839       synchronized(this) {
840         this.notifyAll();
841       }
842     }
843 
844     synchronized void shutdown() {
845       this.go = false;
846       this.notifyAll();
847     }
848 
849     /**
850      * Used for the test.
851      */
852     boolean isEnteringRun() {
853       return this.enteringRun;
854     }
855   }
856 
857   /*
858    * Statistics thread.  Periodically prints the cache statistics to the log.
859    */
860   static class StatisticsThread extends Thread {
861     private final LruBlockCache lru;
862 
863     public StatisticsThread(LruBlockCache lru) {
864       super("LruBlockCacheStats");
865       setDaemon(true);
866       this.lru = lru;
867     }
868 
869     @Override
870     public void run() {
871       lru.logStats();
872     }
873   }
874 
875   public void logStats() {
876     // Log size
877     long totalSize = heapSize();
878     long freeSize = maxSize - totalSize;
879     LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
880         "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
881         "max=" + StringUtils.byteDesc(this.maxSize) + ", " +
882         "blockCount=" + getBlockCount() + ", " +
883         "accesses=" + stats.getRequestCount() + ", " +
884         "hits=" + stats.getHitCount() + ", " +
885         "hitRatio=" + (stats.getHitCount() == 0 ?
886           "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
887         "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
888         "cachingHits=" + stats.getHitCachingCount() + ", " +
889         "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ?
890           "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
891         "evictions=" + stats.getEvictionCount() + ", " +
892         "evicted=" + stats.getEvictedCount() + ", " +
893         "evictedPerRun=" + stats.evictedPerEviction());
894   }
895 
896   /**
897    * Get counter statistics for this cache.
898    *
899    * <p>Includes: total accesses, hits, misses, evicted blocks, and runs
900    * of the eviction processes.
901    */
902   public CacheStats getStats() {
903     return this.stats;
904   }
905 
906   public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
907       (3 * Bytes.SIZEOF_LONG) + (10 * ClassSize.REFERENCE) +
908       (5 * Bytes.SIZEOF_FLOAT) + (2 * Bytes.SIZEOF_BOOLEAN)
909       + ClassSize.OBJECT);
910 
911   @Override
912   public long heapSize() {
913     return getCurrentSize();
914   }
915 
916   public static long calculateOverhead(long maxSize, long blockSize, int concurrency){
917     // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG
918     return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP +
919         ((long)Math.ceil(maxSize*1.2/blockSize)
920             * ClassSize.CONCURRENT_HASHMAP_ENTRY) +
921         ((long)concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
922   }
923 
924   @Override
925   public Iterator<CachedBlock> iterator() {
926     final Iterator<LruCachedBlock> iterator = map.values().iterator();
927 
928     return new Iterator<CachedBlock>() {
929       private final long now = System.nanoTime();
930 
931       @Override
932       public boolean hasNext() {
933         return iterator.hasNext();
934       }
935 
936       @Override
937       public CachedBlock next() {
938         final LruCachedBlock b = iterator.next();
939         return new CachedBlock() {
940           @Override
941           public String toString() {
942             return BlockCacheUtil.toString(this, now);
943           }
944 
945           @Override
946           public BlockPriority getBlockPriority() {
947             return b.getPriority();
948           }
949 
950           @Override
951           public BlockType getBlockType() {
952             return b.getBuffer().getBlockType();
953           }
954 
955           @Override
956           public long getOffset() {
957             return b.getCacheKey().getOffset();
958           }
959 
960           @Override
961           public long getSize() {
962             return b.getBuffer().heapSize();
963           }
964 
965           @Override
966           public long getCachedTime() {
967             return b.getCachedTime();
968           }
969 
970           @Override
971           public String getFilename() {
972             return b.getCacheKey().getHfileName();
973           }
974 
975           @Override
976           public int compareTo(CachedBlock other) {
977             int diff = this.getFilename().compareTo(other.getFilename());
978             if (diff != 0) return diff;
979             diff = (int)(this.getOffset() - other.getOffset());
980             if (diff != 0) return diff;
981             if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
982               throw new IllegalStateException("" + this.getCachedTime() + ", " +
983                 other.getCachedTime());
984             }
985             return (int)(other.getCachedTime() - this.getCachedTime());
986           }
987 
988           @Override
989           public int hashCode() {
990             return b.hashCode();
991           }
992 
993           @Override
994           public boolean equals(Object obj) {
995             if (obj instanceof CachedBlock) {
996               CachedBlock cb = (CachedBlock)obj;
997               return compareTo(cb) == 0;
998             } else {
999               return false;
1000             }
1001           }
1002         };
1003       }
1004 
1005       @Override
1006       public void remove() {
1007         throw new UnsupportedOperationException();
1008       }
1009     };
1010   }
1011 
1012   // Simple calculators of sizes given factors and maxSize
1013 
1014   long acceptableSize() {
1015     return (long)Math.floor(this.maxSize * this.acceptableFactor);
1016   }
1017   private long minSize() {
1018     return (long)Math.floor(this.maxSize * this.minFactor);
1019   }
1020   private long singleSize() {
1021     return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor);
1022   }
1023   private long multiSize() {
1024     return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor);
1025   }
1026   private long memorySize() {
1027     return (long)Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
1028   }
1029 
1030   public void shutdown() {
1031     if (victimHandler != null)
1032       victimHandler.shutdown();
1033     this.scheduleThreadPool.shutdown();
1034     for (int i = 0; i < 10; i++) {
1035       if (!this.scheduleThreadPool.isShutdown()) {
1036         try {
1037           Thread.sleep(10);
1038         } catch (InterruptedException e) {
1039           LOG.warn("Interrupted while sleeping");
1040           Thread.currentThread().interrupt();
1041           break;
1042         }
1043       }
1044     }
1045 
1046     if (!this.scheduleThreadPool.isShutdown()) {
1047       List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
1048       LOG.debug("Still running " + runnables);
1049     }
1050     this.evictionThread.shutdown();
1051   }
1052 
1053   /** Clears the cache. Used in tests. */
1054   @VisibleForTesting
1055   public void clearCache() {
1056     this.map.clear();
1057     this.elements.set(0);
1058   }
1059 
1060   /**
1061    * Used in testing. May be very inefficient.
1062    * @return the set of cached file names
1063    */
1064   @VisibleForTesting
1065   SortedSet<String> getCachedFileNamesForTest() {
1066     SortedSet<String> fileNames = new TreeSet<String>();
1067     for (BlockCacheKey cacheKey : map.keySet()) {
1068       fileNames.add(cacheKey.getHfileName());
1069     }
1070     return fileNames;
1071   }
1072 
1073   @VisibleForTesting
1074   Map<BlockType, Integer> getBlockTypeCountsForTest() {
1075     Map<BlockType, Integer> counts =
1076         new EnumMap<BlockType, Integer>(BlockType.class);
1077     for (LruCachedBlock cb : map.values()) {
1078       BlockType blockType = ((Cacheable)cb.getBuffer()).getBlockType();
1079       Integer count = counts.get(blockType);
1080       counts.put(blockType, (count == null ? 0 : count) + 1);
1081     }
1082     return counts;
1083   }
1084 
1085   @VisibleForTesting
1086   public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
1087     Map<DataBlockEncoding, Integer> counts =
1088         new EnumMap<DataBlockEncoding, Integer>(DataBlockEncoding.class);
1089     for (LruCachedBlock block : map.values()) {
1090       DataBlockEncoding encoding =
1091               ((HFileBlock) block.getBuffer()).getDataBlockEncoding();
1092       Integer count = counts.get(encoding);
1093       counts.put(encoding, (count == null ? 0 : count) + 1);
1094     }
1095     return counts;
1096   }
1097 
1098   public void setVictimCache(BlockCache handler) {
1099     assert victimHandler == null;
1100     victimHandler = handler;
1101   }
1102 
1103   @VisibleForTesting
1104   Map<BlockCacheKey, LruCachedBlock> getMapForTests() {
1105     return map;
1106   }
1107 
1108   BlockCache getVictimHandler() {
1109     return this.victimHandler;
1110   }
1111 
1112   @Override
1113   public BlockCache[] getBlockCaches() {
1114     return null;
1115   }
1116 
1117   @Override
1118   public void returnBlock(BlockCacheKey cacheKey, Cacheable block) {
1119     // There is no SHARED type here. Just return
1120   }
1121 }