View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.lang.ref.WeakReference;
22  import java.nio.ByteBuffer;
23  import java.util.EnumMap;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.PriorityQueue;
28  import java.util.SortedSet;
29  import java.util.TreeSet;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.Executors;
32  import java.util.concurrent.ScheduledExecutorService;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicLong;
35  import java.util.concurrent.locks.ReentrantLock;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.io.HeapSize;
42  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
43  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.hadoop.hbase.util.ClassSize;
46  import org.apache.hadoop.hbase.util.HasThread;
47  import org.apache.hadoop.util.StringUtils;
48  import org.codehaus.jackson.annotate.JsonIgnoreProperties;
49  
50  import com.google.common.annotations.VisibleForTesting;
51  import com.google.common.base.Objects;
52  import com.google.common.util.concurrent.ThreadFactoryBuilder;
53  
54  /**
55   * A block cache implementation that is memory-aware using {@link HeapSize},
56   * memory-bound using an LRU eviction algorithm, and concurrent: backed by a
57   * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving
58   * constant-time {@link #cacheBlock} and {@link #getBlock} operations.<p>
59   *
60   * Contains three levels of block priority to allow for scan-resistance and in-memory families 
61   * {@link org.apache.hadoop.hbase.HColumnDescriptor#setInMemory(boolean)} (An in-memory column 
62   * family is a column family that should be served from memory if possible):
63   * single-access, multiple-accesses, and in-memory priority.
64   * A block is added with an in-memory priority flag if
65   * {@link org.apache.hadoop.hbase.HColumnDescriptor#isInMemory()}, otherwise a block becomes a
66   *  single access priority the first time it is read into this block cache.  If a block is
67   *  accessed again while in cache, it is marked as a multiple access priority block.  This
68   *  delineation of blocks is used to prevent scans from thrashing the cache adding a 
69   *  least-frequently-used element to the eviction algorithm.<p>
70   *
71   * Each priority is given its own chunk of the total cache to ensure
72   * fairness during eviction.  Each priority will retain close to its maximum
73   * size, however, if any priority is not using its entire chunk the others
74   * are able to grow beyond their chunk size.<p>
75   *
76   * Instantiated at a minimum with the total size and average block size.
77   * All sizes are in bytes.  The block size is not especially important as this
78   * cache is fully dynamic in its sizing of blocks.  It is only used for
79   * pre-allocating data structures and in initial heap estimation of the map.<p>
80   *
81   * The detailed constructor defines the sizes for the three priorities (they
82   * should total to the <code>maximum size</code> defined).  It also sets the levels that
83   * trigger and control the eviction thread.<p>
84   *
85   * The <code>acceptable size</code> is the cache size level which triggers the eviction
86   * process to start.  It evicts enough blocks to get the size below the
87   * minimum size specified.<p>
88   *
89   * Eviction happens in a separate thread and involves a single full-scan
90   * of the map.  It determines how many bytes must be freed to reach the minimum
91   * size, and then while scanning determines the fewest least-recently-used
92   * blocks necessary from each of the three priorities (would be 3 times bytes
93   * to free).  It then uses the priority chunk sizes to evict fairly according
94   * to the relative sizes and usage.
95   */
96  @InterfaceAudience.Private
97  @JsonIgnoreProperties({"encodingCountsForTest"})
98  public class LruBlockCache implements ResizableBlockCache, HeapSize {
99  
100   private static final Log LOG = LogFactory.getLog(LruBlockCache.class);
101 
102   /**
103    * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep
104    * evicting during an eviction run till the cache size is down to 80% of the total.
105    */
106   static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";
107 
108   /**
109    * Acceptable size of cache (no evictions if size < acceptable)
110    */
111   static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.acceptable.factor";
112 
113   static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.single.percentage";
114   static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.multi.percentage";
115   static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.memory.percentage";
116 
117   /**
118    * Configuration key to force data-block always (except in-memory are too much)
119    * cached in memory for in-memory hfile, unlike inMemory, which is a column-family
120    * configuration, inMemoryForceMode is a cluster-wide configuration
121    */
122   static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME = "hbase.lru.rs.inmemoryforcemode";
123 
124   /** Default Configuration Parameters*/
125 
126   /** Backing Concurrent Map Configuration */
127   static final float DEFAULT_LOAD_FACTOR = 0.75f;
128   static final int DEFAULT_CONCURRENCY_LEVEL = 16;
129 
130   /** Eviction thresholds */
131   static final float DEFAULT_MIN_FACTOR = 0.95f;
132   static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f;
133 
134   /** Priority buckets */
135   static final float DEFAULT_SINGLE_FACTOR = 0.25f;
136   static final float DEFAULT_MULTI_FACTOR = 0.50f;
137   static final float DEFAULT_MEMORY_FACTOR = 0.25f;
138 
139   static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false;
140 
141   /** Statistics thread */
142   static final int statThreadPeriod = 60 * 5;
143   private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size";
144   private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
145 
146   /** Concurrent map (the cache) */
147   private final Map<BlockCacheKey,LruCachedBlock> map;
148 
149   /** Eviction lock (locked when eviction in process) */
150   private final ReentrantLock evictionLock = new ReentrantLock(true);
151   private final long maxBlockSize;
152 
153   /** Volatile boolean to track if we are in an eviction process or not */
154   private volatile boolean evictionInProgress = false;
155 
156   /** Eviction thread */
157   private final EvictionThread evictionThread;
158 
159   /** Statistics thread schedule pool (for heavy debugging, could remove) */
160   private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
161     new ThreadFactoryBuilder().setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build());
162 
163   /** Current size of cache */
164   private final AtomicLong size;
165 
166   /** Current number of cached elements */
167   private final AtomicLong elements;
168 
169   /** Cache access count (sequential ID) */
170   private final AtomicLong count;
171 
172   /** Cache statistics */
173   private final CacheStats stats;
174 
175   /** Maximum allowable size of cache (block put if size > max, evict) */
176   private long maxSize;
177 
178   /** Approximate block size */
179   private long blockSize;
180 
181   /** Acceptable size of cache (no evictions if size < acceptable) */
182   private float acceptableFactor;
183 
184   /** Minimum threshold of cache (when evicting, evict until size < min) */
185   private float minFactor;
186 
187   /** Single access bucket size */
188   private float singleFactor;
189 
190   /** Multiple access bucket size */
191   private float multiFactor;
192 
193   /** In-memory bucket size */
194   private float memoryFactor;
195 
196   /** Overhead of the structure itself */
197   private long overhead;
198 
199   /** Whether in-memory hfile's data block has higher priority when evicting */
200   private boolean forceInMemory;
201 
202   /** Where to send victims (blocks evicted/missing from the cache) */
203   private BlockCache victimHandler = null;
204 
205   /**
206    * Default constructor.  Specify maximum size and expected average block
207    * size (approximation is fine).
208    *
209    * <p>All other factors will be calculated based on defaults specified in
210    * this class.
211    * @param maxSize maximum size of cache, in bytes
212    * @param blockSize approximate size of each block, in bytes
213    */
214   public LruBlockCache(long maxSize, long blockSize) {
215     this(maxSize, blockSize, true);
216   }
217 
218   /**
219    * Constructor used for testing.  Allows disabling of the eviction thread.
220    */
221   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
222     this(maxSize, blockSize, evictionThread,
223         (int)Math.ceil(1.2*maxSize/blockSize),
224         DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
225         DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
226         DEFAULT_SINGLE_FACTOR,
227         DEFAULT_MULTI_FACTOR,
228         DEFAULT_MEMORY_FACTOR,
229         false,
230         DEFAULT_MAX_BLOCK_SIZE
231         );
232   }
233 
234   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
235     this(maxSize, blockSize, evictionThread,
236         (int)Math.ceil(1.2*maxSize/blockSize),
237         DEFAULT_LOAD_FACTOR,
238         DEFAULT_CONCURRENCY_LEVEL,
239         conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
240         conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR),
241         conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR),
242         conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR),
243         conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR),
244         conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE),
245         conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE)
246         );
247   }
248 
249   public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
250     this(maxSize, blockSize, true, conf);
251   }
252 
253   /**
254    * Configurable constructor.  Use this constructor if not using defaults.
255    * @param maxSize maximum size of this cache, in bytes
256    * @param blockSize expected average size of blocks, in bytes
257    * @param evictionThread whether to run evictions in a bg thread or not
258    * @param mapInitialSize initial size of backing ConcurrentHashMap
259    * @param mapLoadFactor initial load factor of backing ConcurrentHashMap
260    * @param mapConcurrencyLevel initial concurrency factor for backing CHM
261    * @param minFactor percentage of total size that eviction will evict until
262    * @param acceptableFactor percentage of total size that triggers eviction
263    * @param singleFactor percentage of total size for single-access blocks
264    * @param multiFactor percentage of total size for multiple-access blocks
265    * @param memoryFactor percentage of total size for in-memory blocks
266    */
267   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
268       int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
269       float minFactor, float acceptableFactor, float singleFactor,
270       float multiFactor, float memoryFactor, boolean forceInMemory, long maxBlockSize) {
271     this.maxBlockSize = maxBlockSize;
272     if(singleFactor + multiFactor + memoryFactor != 1 ||
273         singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) {
274       throw new IllegalArgumentException("Single, multi, and memory factors " +
275           " should be non-negative and total 1.0");
276     }
277     if(minFactor >= acceptableFactor) {
278       throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
279     }
280     if(minFactor >= 1.0f || acceptableFactor >= 1.0f) {
281       throw new IllegalArgumentException("all factors must be < 1");
282     }
283     this.maxSize = maxSize;
284     this.blockSize = blockSize;
285     this.forceInMemory = forceInMemory;
286     map = new ConcurrentHashMap<BlockCacheKey,LruCachedBlock>(mapInitialSize,
287         mapLoadFactor, mapConcurrencyLevel);
288     this.minFactor = minFactor;
289     this.acceptableFactor = acceptableFactor;
290     this.singleFactor = singleFactor;
291     this.multiFactor = multiFactor;
292     this.memoryFactor = memoryFactor;
293     this.stats = new CacheStats(this.getClass().getSimpleName());
294     this.count = new AtomicLong(0);
295     this.elements = new AtomicLong(0);
296     this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
297     this.size = new AtomicLong(this.overhead);
298     if(evictionThread) {
299       this.evictionThread = new EvictionThread(this);
300       this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
301     } else {
302       this.evictionThread = null;
303     }
304     // TODO: Add means of turning this off.  Bit obnoxious running thread just to make a log
305     // every five minutes.
306     this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
307         statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
308   }
309 
310   @Override
311   public void setMaxSize(long maxSize) {
312     this.maxSize = maxSize;
313     if(this.size.get() > acceptableSize() && !evictionInProgress) {
314       runEviction();
315     }
316   }
317 
318   // BlockCache implementation
319 
320   /**
321    * Cache the block with the specified name and buffer.
322    * <p>
323    * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)
324    * this can happen, for which we compare the buffer contents.
325    * @param cacheKey block's cache key
326    * @param buf block buffer
327    * @param inMemory if block is in-memory
328    * @param cacheDataInL1
329    */
330   @Override
331   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
332       final boolean cacheDataInL1) {
333 
334     if (buf.heapSize() > maxBlockSize) {
335       // If there are a lot of blocks that are too
336       // big this can make the logs way too noisy.
337       // So we log 2%
338       if (stats.failInsert() % 50 == 0) {
339         LOG.warn("Trying to cache too large a block "
340             + cacheKey.getHfileName() + " @ "
341             + cacheKey.getOffset()
342             + " is " + buf.heapSize()
343             + " which is larger than " + maxBlockSize);
344       }
345       return;
346     }
347 
348     LruCachedBlock cb = map.get(cacheKey);
349     if (cb != null) {
350       // compare the contents, if they are not equal, we are in big trouble
351       if (compare(buf, cb.getBuffer()) != 0) {
352         throw new RuntimeException("Cached block contents differ, which should not have happened."
353           + "cacheKey:" + cacheKey);
354       }
355       String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey();
356       msg += ". This is harmless and can happen in rare cases (see HBASE-8547)";
357       LOG.warn(msg);
358       return;
359     }
360     cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
361     long newSize = updateSizeMetrics(cb, false);
362     map.put(cacheKey, cb);
363     long val = elements.incrementAndGet();
364     if (LOG.isTraceEnabled()) {
365       long size = map.size();
366       assertCounterSanity(size, val);
367     }
368     if (newSize > acceptableSize() && !evictionInProgress) {
369       runEviction();
370     }
371   }
372 
373   /**
374    * Sanity-checking for parity between actual block cache content and metrics.
375    * Intended only for use with TRACE level logging and -ea JVM.
376    */
377   private static void assertCounterSanity(long mapSize, long counterVal) {
378     if (counterVal < 0) {
379       LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal +
380         ", mapSize=" + mapSize);
381       return;
382     }
383     if (mapSize < Integer.MAX_VALUE) {
384       double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.);
385       if (pct_diff > 0.05) {
386         LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal +
387           ", mapSize=" + mapSize);
388       }
389     }
390   }
391 
392   private int compare(Cacheable left, Cacheable right) {
393     ByteBuffer l = ByteBuffer.allocate(left.getSerializedLength());
394     left.serialize(l);
395     ByteBuffer r = ByteBuffer.allocate(right.getSerializedLength());
396     right.serialize(r);
397     return Bytes.compareTo(l.array(), l.arrayOffset(), l.limit(),
398       r.array(), r.arrayOffset(), r.limit());
399   }
400 
401   /**
402    * Cache the block with the specified name and buffer.
403    * <p>
404    * @param cacheKey block's cache key
405    * @param buf block buffer
406    */
407   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
408     cacheBlock(cacheKey, buf, false, false);
409   }
410 
411   /**
412    * Helper function that updates the local size counter and also updates any
413    * per-cf or per-blocktype metrics it can discern from given
414    * {@link LruCachedBlock}
415    *
416    * @param cb
417    * @param evict
418    */
419   protected long updateSizeMetrics(LruCachedBlock cb, boolean evict) {
420     long heapsize = cb.heapSize();
421     if (evict) {
422       heapsize *= -1;
423     }
424     return size.addAndGet(heapsize);
425   }
426 
427   /**
428    * Get the buffer of the block with the specified name.
429    * @param cacheKey block's cache key
430    * @param caching true if the caller caches blocks on cache misses
431    * @param repeat Whether this is a repeat lookup for the same block
432    *        (used to avoid double counting cache misses when doing double-check locking)
433    * @param updateCacheMetrics Whether to update cache metrics or not
434    * @return buffer of specified cache key, or null if not in cache
435    */
436   @Override
437   public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
438       boolean updateCacheMetrics) {
439     LruCachedBlock cb = map.get(cacheKey);
440     if (cb == null) {
441       if (!repeat && updateCacheMetrics) stats.miss(caching, cacheKey.isPrimary());
442       // If there is another block cache then try and read there.
443       // However if this is a retry ( second time in double checked locking )
444       // And it's already a miss then the l2 will also be a miss.
445       if (victimHandler != null && !repeat) {
446         Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
447 
448         // Promote this to L1.
449         if (result != null && caching) {
450           cacheBlock(cacheKey, result, /* inMemory = */ false, /* cacheData = */ true);
451         }
452         return result;
453       }
454       return null;
455     }
456     if (updateCacheMetrics) stats.hit(caching, cacheKey.isPrimary());
457     cb.access(count.incrementAndGet());
458     return cb.getBuffer();
459   }
460 
461   /**
462    * Whether the cache contains block with specified cacheKey
463    * @param cacheKey
464    * @return true if contains the block
465    */
466   public boolean containsBlock(BlockCacheKey cacheKey) {
467     return map.containsKey(cacheKey);
468   }
469 
470   @Override
471   public boolean evictBlock(BlockCacheKey cacheKey) {
472     LruCachedBlock cb = map.get(cacheKey);
473     if (cb == null) return false;
474     evictBlock(cb, false);
475     return true;
476   }
477 
478   /**
479    * Evicts all blocks for a specific HFile. This is an
480    * expensive operation implemented as a linear-time search through all blocks
481    * in the cache. Ideally this should be a search in a log-access-time map.
482    *
483    * <p>
484    * This is used for evict-on-close to remove all blocks of a specific HFile.
485    *
486    * @return the number of blocks evicted
487    */
488   @Override
489   public int evictBlocksByHfileName(String hfileName) {
490     int numEvicted = 0;
491     for (BlockCacheKey key : map.keySet()) {
492       if (key.getHfileName().equals(hfileName)) {
493         if (evictBlock(key))
494           ++numEvicted;
495       }
496     }
497     if (victimHandler != null) {
498       numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
499     }
500     return numEvicted;
501   }
502 
503   /**
504    * Evict the block, and it will be cached by the victim handler if exists &amp;&amp;
505    * block may be read again later
506    * @param block
507    * @param evictedByEvictionProcess true if the given block is evicted by
508    *          EvictionThread
509    * @return the heap size of evicted block
510    */
511   protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) {
512     map.remove(block.getCacheKey());
513     updateSizeMetrics(block, true);
514     long val = elements.decrementAndGet();
515     if (LOG.isTraceEnabled()) {
516       long size = map.size();
517       assertCounterSanity(size, val);
518     }
519     stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary());
520     if (evictedByEvictionProcess && victimHandler != null) {
521       if (victimHandler instanceof BucketCache) {
522         boolean wait = getCurrentSize() < acceptableSize();
523         boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
524         ((BucketCache)victimHandler).cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
525             inMemory, wait);
526       } else {
527         victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer());
528       }
529     }
530     return block.heapSize();
531   }
532 
533   /**
534    * Multi-threaded call to run the eviction process.
535    */
536   private void runEviction() {
537     if(evictionThread == null) {
538       evict();
539     } else {
540       evictionThread.evict();
541     }
542   }
543 
544   /**
545    * Eviction method.
546    */
547   void evict() {
548 
549     // Ensure only one eviction at a time
550     if(!evictionLock.tryLock()) return;
551 
552     try {
553       evictionInProgress = true;
554       long currentSize = this.size.get();
555       long bytesToFree = currentSize - minSize();
556 
557       if (LOG.isTraceEnabled()) {
558         LOG.trace("Block cache LRU eviction started; Attempting to free " +
559           StringUtils.byteDesc(bytesToFree) + " of total=" +
560           StringUtils.byteDesc(currentSize));
561       }
562 
563       if(bytesToFree <= 0) return;
564 
565       // Instantiate priority buckets
566       BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize,
567           singleSize());
568       BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize,
569           multiSize());
570       BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize,
571           memorySize());
572 
573       // Scan entire map putting into appropriate buckets
574       for(LruCachedBlock cachedBlock : map.values()) {
575         switch(cachedBlock.getPriority()) {
576           case SINGLE: {
577             bucketSingle.add(cachedBlock);
578             break;
579           }
580           case MULTI: {
581             bucketMulti.add(cachedBlock);
582             break;
583           }
584           case MEMORY: {
585             bucketMemory.add(cachedBlock);
586             break;
587           }
588         }
589       }
590 
591       long bytesFreed = 0;
592       if (forceInMemory || memoryFactor > 0.999f) {
593         long s = bucketSingle.totalSize();
594         long m = bucketMulti.totalSize();
595         if (bytesToFree > (s + m)) {
596           // this means we need to evict blocks in memory bucket to make room,
597           // so the single and multi buckets will be emptied
598           bytesFreed = bucketSingle.free(s);
599           bytesFreed += bucketMulti.free(m);
600           if (LOG.isTraceEnabled()) {
601             LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
602               " from single and multi buckets");
603           }
604           bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
605           if (LOG.isTraceEnabled()) {
606             LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
607               " total from all three buckets ");
608           }
609         } else {
610           // this means no need to evict block in memory bucket,
611           // and we try best to make the ratio between single-bucket and
612           // multi-bucket is 1:2
613           long bytesRemain = s + m - bytesToFree;
614           if (3 * s <= bytesRemain) {
615             // single-bucket is small enough that no eviction happens for it
616             // hence all eviction goes from multi-bucket
617             bytesFreed = bucketMulti.free(bytesToFree);
618           } else if (3 * m <= 2 * bytesRemain) {
619             // multi-bucket is small enough that no eviction happens for it
620             // hence all eviction goes from single-bucket
621             bytesFreed = bucketSingle.free(bytesToFree);
622           } else {
623             // both buckets need to evict some blocks
624             bytesFreed = bucketSingle.free(s - bytesRemain / 3);
625             if (bytesFreed < bytesToFree) {
626               bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
627             }
628           }
629         }
630       } else {
631         PriorityQueue<BlockBucket> bucketQueue =
632           new PriorityQueue<BlockBucket>(3);
633 
634         bucketQueue.add(bucketSingle);
635         bucketQueue.add(bucketMulti);
636         bucketQueue.add(bucketMemory);
637 
638         int remainingBuckets = 3;
639 
640         BlockBucket bucket;
641         while((bucket = bucketQueue.poll()) != null) {
642           long overflow = bucket.overflow();
643           if(overflow > 0) {
644             long bucketBytesToFree = Math.min(overflow,
645                 (bytesToFree - bytesFreed) / remainingBuckets);
646             bytesFreed += bucket.free(bucketBytesToFree);
647           }
648           remainingBuckets--;
649         }
650       }
651 
652       if (LOG.isTraceEnabled()) {
653         long single = bucketSingle.totalSize();
654         long multi = bucketMulti.totalSize();
655         long memory = bucketMemory.totalSize();
656         LOG.trace("Block cache LRU eviction completed; " +
657           "freed=" + StringUtils.byteDesc(bytesFreed) + ", " +
658           "total=" + StringUtils.byteDesc(this.size.get()) + ", " +
659           "single=" + StringUtils.byteDesc(single) + ", " +
660           "multi=" + StringUtils.byteDesc(multi) + ", " +
661           "memory=" + StringUtils.byteDesc(memory));
662       }
663     } finally {
664       stats.evict();
665       evictionInProgress = false;
666       evictionLock.unlock();
667     }
668   }
669 
670   @Override
671   public String toString() {
672     return Objects.toStringHelper(this)
673       .add("blockCount", getBlockCount())
674       .add("currentSize", getCurrentSize())
675       .add("freeSize", getFreeSize())
676       .add("maxSize", getMaxSize())
677       .add("heapSize", heapSize())
678       .add("minSize", minSize())
679       .add("minFactor", minFactor)
680       .add("multiSize", multiSize())
681       .add("multiFactor", multiFactor)
682       .add("singleSize", singleSize())
683       .add("singleFactor", singleFactor)
684       .toString();
685   }
686 
687   /**
688    * Used to group blocks into priority buckets.  There will be a BlockBucket
689    * for each priority (single, multi, memory).  Once bucketed, the eviction
690    * algorithm takes the appropriate number of elements out of each according
691    * to configuration parameters and their relatives sizes.
692    */
693   private class BlockBucket implements Comparable<BlockBucket> {
694 
695     private final String name;
696     private LruCachedBlockQueue queue;
697     private long totalSize = 0;
698     private long bucketSize;
699 
700     public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) {
701       this.name = name;
702       this.bucketSize = bucketSize;
703       queue = new LruCachedBlockQueue(bytesToFree, blockSize);
704       totalSize = 0;
705     }
706 
707     public void add(LruCachedBlock block) {
708       totalSize += block.heapSize();
709       queue.add(block);
710     }
711 
712     public long free(long toFree) {
713       if (LOG.isTraceEnabled()) {
714         LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this);
715       }
716       LruCachedBlock cb;
717       long freedBytes = 0;
718       while ((cb = queue.pollLast()) != null) {
719         freedBytes += evictBlock(cb, true);
720         if (freedBytes >= toFree) {
721           return freedBytes;
722         }
723       }
724       if (LOG.isTraceEnabled()) {
725         LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this);
726       }
727       return freedBytes;
728     }
729 
730     public long overflow() {
731       return totalSize - bucketSize;
732     }
733 
734     public long totalSize() {
735       return totalSize;
736     }
737 
738     public int compareTo(BlockBucket that) {
739       if(this.overflow() == that.overflow()) return 0;
740       return this.overflow() > that.overflow() ? 1 : -1;
741     }
742 
743     @Override
744     public boolean equals(Object that) {
745       if (that == null || !(that instanceof BlockBucket)){
746         return false;
747       }
748       return compareTo((BlockBucket)that) == 0;
749     }
750 
751     @Override
752     public int hashCode() {
753       return Objects.hashCode(name, bucketSize, queue, totalSize);
754     }
755 
756     @Override
757     public String toString() {
758       return Objects.toStringHelper(this)
759         .add("name", name)
760         .add("totalSize", StringUtils.byteDesc(totalSize))
761         .add("bucketSize", StringUtils.byteDesc(bucketSize))
762         .toString();
763     }
764   }
765 
766   /**
767    * Get the maximum size of this cache.
768    * @return max size in bytes
769    */
770   public long getMaxSize() {
771     return this.maxSize;
772   }
773 
774   @Override
775   public long getCurrentSize() {
776     return this.size.get();
777   }
778 
779   @Override
780   public long getFreeSize() {
781     return getMaxSize() - getCurrentSize();
782   }
783 
784   @Override
785   public long size() {
786     return getMaxSize();
787   }
788 
789   @Override
790   public long getBlockCount() {
791     return this.elements.get();
792   }
793 
794   EvictionThread getEvictionThread() {
795     return this.evictionThread;
796   }
797 
798   /*
799    * Eviction thread.  Sits in waiting state until an eviction is triggered
800    * when the cache size grows above the acceptable level.<p>
801    *
802    * Thread is triggered into action by {@link LruBlockCache#runEviction()}
803    */
804   static class EvictionThread extends HasThread {
805     private WeakReference<LruBlockCache> cache;
806     private volatile boolean go = true;
807     // flag set after enter the run method, used for test
808     private boolean enteringRun = false;
809 
810     public EvictionThread(LruBlockCache cache) {
811       super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread");
812       setDaemon(true);
813       this.cache = new WeakReference<LruBlockCache>(cache);
814     }
815 
816     @Override
817     public void run() {
818       enteringRun = true;
819       while (this.go) {
820         synchronized(this) {
821           try {
822             this.wait(1000 * 10/*Don't wait for ever*/);
823           } catch(InterruptedException e) {
824             LOG.warn("Interrupted eviction thread ", e);
825             Thread.currentThread().interrupt();
826           }
827         }
828         LruBlockCache cache = this.cache.get();
829         if (cache == null) break;
830         cache.evict();
831       }
832     }
833 
834     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
835         justification="This is what we want")
836     public void evict() {
837       synchronized(this) {
838         this.notifyAll();
839       }
840     }
841 
842     synchronized void shutdown() {
843       this.go = false;
844       this.notifyAll();
845     }
846 
847     /**
848      * Used for the test.
849      */
850     boolean isEnteringRun() {
851       return this.enteringRun;
852     }
853   }
854 
855   /*
856    * Statistics thread.  Periodically prints the cache statistics to the log.
857    */
858   static class StatisticsThread extends Thread {
859     private final LruBlockCache lru;
860 
861     public StatisticsThread(LruBlockCache lru) {
862       super("LruBlockCacheStats");
863       setDaemon(true);
864       this.lru = lru;
865     }
866 
867     @Override
868     public void run() {
869       lru.logStats();
870     }
871   }
872 
873   public void logStats() {
874     // Log size
875     long totalSize = heapSize();
876     long freeSize = maxSize - totalSize;
877     LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
878         "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
879         "max=" + StringUtils.byteDesc(this.maxSize) + ", " +
880         "blockCount=" + getBlockCount() + ", " +
881         "accesses=" + stats.getRequestCount() + ", " +
882         "hits=" + stats.getHitCount() + ", " +
883         "hitRatio=" + (stats.getHitCount() == 0 ?
884           "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
885         "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
886         "cachingHits=" + stats.getHitCachingCount() + ", " +
887         "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ?
888           "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
889         "evictions=" + stats.getEvictionCount() + ", " +
890         "evicted=" + stats.getEvictedCount() + ", " +
891         "evictedPerRun=" + stats.evictedPerEviction());
892   }
893 
894   /**
895    * Get counter statistics for this cache.
896    *
897    * <p>Includes: total accesses, hits, misses, evicted blocks, and runs
898    * of the eviction processes.
899    */
900   public CacheStats getStats() {
901     return this.stats;
902   }
903 
904   public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
905       (3 * Bytes.SIZEOF_LONG) + (10 * ClassSize.REFERENCE) +
906       (5 * Bytes.SIZEOF_FLOAT) + (2 * Bytes.SIZEOF_BOOLEAN)
907       + ClassSize.OBJECT);
908 
909   @Override
910   public long heapSize() {
911     return getCurrentSize();
912   }
913 
914   public static long calculateOverhead(long maxSize, long blockSize, int concurrency){
915     // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG
916     return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP +
917         ((long)Math.ceil(maxSize*1.2/blockSize)
918             * ClassSize.CONCURRENT_HASHMAP_ENTRY) +
919         ((long)concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
920   }
921 
922   @Override
923   public Iterator<CachedBlock> iterator() {
924     final Iterator<LruCachedBlock> iterator = map.values().iterator();
925 
926     return new Iterator<CachedBlock>() {
927       private final long now = System.nanoTime();
928 
929       @Override
930       public boolean hasNext() {
931         return iterator.hasNext();
932       }
933 
934       @Override
935       public CachedBlock next() {
936         final LruCachedBlock b = iterator.next();
937         return new CachedBlock() {
938           @Override
939           public String toString() {
940             return BlockCacheUtil.toString(this, now);
941           }
942 
943           @Override
944           public BlockPriority getBlockPriority() {
945             return b.getPriority();
946           }
947 
948           @Override
949           public BlockType getBlockType() {
950             return b.getBuffer().getBlockType();
951           }
952 
953           @Override
954           public long getOffset() {
955             return b.getCacheKey().getOffset();
956           }
957 
958           @Override
959           public long getSize() {
960             return b.getBuffer().heapSize();
961           }
962 
963           @Override
964           public long getCachedTime() {
965             return b.getCachedTime();
966           }
967 
968           @Override
969           public String getFilename() {
970             return b.getCacheKey().getHfileName();
971           }
972 
973           @Override
974           public int compareTo(CachedBlock other) {
975             int diff = this.getFilename().compareTo(other.getFilename());
976             if (diff != 0) return diff;
977             diff = (int)(this.getOffset() - other.getOffset());
978             if (diff != 0) return diff;
979             if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
980               throw new IllegalStateException("" + this.getCachedTime() + ", " +
981                 other.getCachedTime());
982             }
983             return (int)(other.getCachedTime() - this.getCachedTime());
984           }
985 
986           @Override
987           public int hashCode() {
988             return b.hashCode();
989           }
990 
991           @Override
992           public boolean equals(Object obj) {
993             if (obj instanceof CachedBlock) {
994               CachedBlock cb = (CachedBlock)obj;
995               return compareTo(cb) == 0;
996             } else {
997               return false;
998             }
999           }
1000         };
1001       }
1002 
1003       @Override
1004       public void remove() {
1005         throw new UnsupportedOperationException();
1006       }
1007     };
1008   }
1009 
1010   // Simple calculators of sizes given factors and maxSize
1011 
1012   long acceptableSize() {
1013     return (long)Math.floor(this.maxSize * this.acceptableFactor);
1014   }
1015   private long minSize() {
1016     return (long)Math.floor(this.maxSize * this.minFactor);
1017   }
1018   private long singleSize() {
1019     return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor);
1020   }
1021   private long multiSize() {
1022     return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor);
1023   }
1024   private long memorySize() {
1025     return (long)Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
1026   }
1027 
1028   public void shutdown() {
1029     if (victimHandler != null)
1030       victimHandler.shutdown();
1031     this.scheduleThreadPool.shutdown();
1032     for (int i = 0; i < 10; i++) {
1033       if (!this.scheduleThreadPool.isShutdown()) {
1034         try {
1035           Thread.sleep(10);
1036         } catch (InterruptedException e) {
1037           LOG.warn("Interrupted while sleeping");
1038           Thread.currentThread().interrupt();
1039           break;
1040         }
1041       }
1042     }
1043 
1044     if (!this.scheduleThreadPool.isShutdown()) {
1045       List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
1046       LOG.debug("Still running " + runnables);
1047     }
1048     this.evictionThread.shutdown();
1049   }
1050 
1051   /** Clears the cache. Used in tests. */
1052   @VisibleForTesting
1053   public void clearCache() {
1054     this.map.clear();
1055     this.elements.set(0);
1056   }
1057 
1058   /**
1059    * Used in testing. May be very inefficient.
1060    * @return the set of cached file names
1061    */
1062   @VisibleForTesting
1063   SortedSet<String> getCachedFileNamesForTest() {
1064     SortedSet<String> fileNames = new TreeSet<String>();
1065     for (BlockCacheKey cacheKey : map.keySet()) {
1066       fileNames.add(cacheKey.getHfileName());
1067     }
1068     return fileNames;
1069   }
1070 
1071   @VisibleForTesting
1072   Map<BlockType, Integer> getBlockTypeCountsForTest() {
1073     Map<BlockType, Integer> counts =
1074         new EnumMap<BlockType, Integer>(BlockType.class);
1075     for (LruCachedBlock cb : map.values()) {
1076       BlockType blockType = ((Cacheable)cb.getBuffer()).getBlockType();
1077       Integer count = counts.get(blockType);
1078       counts.put(blockType, (count == null ? 0 : count) + 1);
1079     }
1080     return counts;
1081   }
1082 
1083   @VisibleForTesting
1084   public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
1085     Map<DataBlockEncoding, Integer> counts =
1086         new EnumMap<DataBlockEncoding, Integer>(DataBlockEncoding.class);
1087     for (LruCachedBlock block : map.values()) {
1088       DataBlockEncoding encoding =
1089               ((HFileBlock) block.getBuffer()).getDataBlockEncoding();
1090       Integer count = counts.get(encoding);
1091       counts.put(encoding, (count == null ? 0 : count) + 1);
1092     }
1093     return counts;
1094   }
1095 
1096   public void setVictimCache(BlockCache handler) {
1097     assert victimHandler == null;
1098     victimHandler = handler;
1099   }
1100 
1101   @VisibleForTesting
1102   Map<BlockCacheKey, LruCachedBlock> getMapForTests() {
1103     return map;
1104   }
1105 
1106   BlockCache getVictimHandler() {
1107     return this.victimHandler;
1108   }
1109 
1110   @Override
1111   public BlockCache[] getBlockCaches() {
1112     return null;
1113   }
1114 
1115   @Override
1116   public void returnBlock(BlockCacheKey cacheKey, Cacheable block) {
1117     // There is no SHARED type here. Just return
1118   }
1119 }