View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.lang.ref.WeakReference;
22  import java.nio.ByteBuffer;
23  import java.util.EnumMap;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.PriorityQueue;
28  import java.util.SortedSet;
29  import java.util.TreeSet;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.Executors;
32  import java.util.concurrent.ScheduledExecutorService;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicLong;
35  import java.util.concurrent.locks.ReentrantLock;
36  
37  import com.google.common.base.Objects;
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.hbase.io.HeapSize;
43  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
44  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.hbase.util.ClassSize;
47  import org.apache.hadoop.hbase.util.HasThread;
48  import org.apache.hadoop.util.StringUtils;
49  import org.codehaus.jackson.annotate.JsonIgnoreProperties;
50  
51  import com.google.common.annotations.VisibleForTesting;
52  import com.google.common.util.concurrent.ThreadFactoryBuilder;
53  
54  /**
55   * A block cache implementation that is memory-aware using {@link HeapSize},
56   * memory-bound using an LRU eviction algorithm, and concurrent: backed by a
57   * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving
58   * constant-time {@link #cacheBlock} and {@link #getBlock} operations.<p>
59   *
60   * Contains three levels of block priority to allow for
61   * scan-resistance and in-memory families 
62   * {@link org.apache.hadoop.hbase.HColumnDescriptor#setInMemory(boolean)} (An
63   * in-memory column family is a column family that should be served from memory if possible):
64   * single-access, multiple-accesses, and in-memory priority.
65   * A block is added with an in-memory priority flag if
66   * {@link org.apache.hadoop.hbase.HColumnDescriptor#isInMemory()}, 
67   * otherwise a block becomes a single access
68   * priority the first time it is read into this block cache.  If a block is accessed again while
69   * in cache, it is marked as a multiple access priority block.  This delineation of blocks is used
70   * to prevent scans from thrashing the cache adding a least-frequently-used
71   * element to the eviction algorithm.<p>
72   *
73   * Each priority is given its own chunk of the total cache to ensure
74   * fairness during eviction.  Each priority will retain close to its maximum
75   * size, however, if any priority is not using its entire chunk the others
76   * are able to grow beyond their chunk size.<p>
77   *
78   * Instantiated at a minimum with the total size and average block size.
79   * All sizes are in bytes.  The block size is not especially important as this
80   * cache is fully dynamic in its sizing of blocks.  It is only used for
81   * pre-allocating data structures and in initial heap estimation of the map.<p>
82   *
83   * The detailed constructor defines the sizes for the three priorities (they
84   * should total to the <code>maximum size</code> defined).  It also sets the levels that
85   * trigger and control the eviction thread.<p>
86   *
87   * The <code>acceptable size</code> is the cache size level which triggers the eviction
88   * process to start.  It evicts enough blocks to get the size below the
89   * minimum size specified.<p>
90   *
91   * Eviction happens in a separate thread and involves a single full-scan
92   * of the map.  It determines how many bytes must be freed to reach the minimum
93   * size, and then while scanning determines the fewest least-recently-used
94   * blocks necessary from each of the three priorities (would be 3 times bytes
95   * to free).  It then uses the priority chunk sizes to evict fairly according
96   * to the relative sizes and usage.
97   */
98  @InterfaceAudience.Private
99  @JsonIgnoreProperties({"encodingCountsForTest"})
100 public class LruBlockCache implements ResizableBlockCache, HeapSize {
101 
102   private static final Log LOG = LogFactory.getLog(LruBlockCache.class);
103 
104   /**
105    * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep
106    * evicting during an eviction run till the cache size is down to 80% of the total.
107    */
108   static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";
109 
110   /**
111    * Acceptable size of cache (no evictions if size < acceptable)
112    */
113   static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.acceptable.factor";
114 
115   /**
116    * Hard capacity limit of cache, will reject any put if size > this * acceptable
117    */
118   static final String LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.hard.capacity.limit.factor";
119   static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.single.percentage";
120   static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.multi.percentage";
121   static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.memory.percentage";
122 
123   /**
124    * Configuration key to force data-block always (except in-memory are too much)
125    * cached in memory for in-memory hfile, unlike inMemory, which is a column-family
126    * configuration, inMemoryForceMode is a cluster-wide configuration
127    */
128   static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME = "hbase.lru.rs.inmemoryforcemode";
129 
130   /** Default Configuration Parameters*/
131 
132   /** Backing Concurrent Map Configuration */
133   static final float DEFAULT_LOAD_FACTOR = 0.75f;
134   static final int DEFAULT_CONCURRENCY_LEVEL = 16;
135 
136   /** Eviction thresholds */
137   static final float DEFAULT_MIN_FACTOR = 0.95f;
138   static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f;
139 
140   /** Priority buckets */
141   static final float DEFAULT_SINGLE_FACTOR = 0.25f;
142   static final float DEFAULT_MULTI_FACTOR = 0.50f;
143   static final float DEFAULT_MEMORY_FACTOR = 0.25f;
144 
145   /** default hard capacity limit */
146   static final float DEFAULT_HARD_CAPACITY_LIMIT_FACTOR = 1.2f;
147 
148   static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false;
149 
150   /** Statistics thread */
151   static final int statThreadPeriod = 60 * 5;
152   private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size";
153   private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
154 
155   /** Concurrent map (the cache) */
156   private final Map<BlockCacheKey,LruCachedBlock> map;
157 
158   /** Eviction lock (locked when eviction in process) */
159   private final ReentrantLock evictionLock = new ReentrantLock(true);
160   private final long maxBlockSize;
161 
162   /** Volatile boolean to track if we are in an eviction process or not */
163   private volatile boolean evictionInProgress = false;
164 
165   /** Eviction thread */
166   private final EvictionThread evictionThread;
167 
168   /** Statistics thread schedule pool (for heavy debugging, could remove) */
169   private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
170     new ThreadFactoryBuilder().setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build());
171 
172   /** Current size of cache */
173   private final AtomicLong size;
174 
175   /** Current number of cached elements */
176   private final AtomicLong elements;
177 
178   /** Cache access count (sequential ID) */
179   private final AtomicLong count;
180 
181   /** hard capacity limit */
182   private float hardCapacityLimitFactor;
183 
184   /** Cache statistics */
185   private final CacheStats stats;
186 
187   /** Maximum allowable size of cache (block put if size > max, evict) */
188   private long maxSize;
189 
190   /** Approximate block size */
191   private long blockSize;
192 
193   /** Acceptable size of cache (no evictions if size < acceptable) */
194   private float acceptableFactor;
195 
196   /** Minimum threshold of cache (when evicting, evict until size < min) */
197   private float minFactor;
198 
199   /** Single access bucket size */
200   private float singleFactor;
201 
202   /** Multiple access bucket size */
203   private float multiFactor;
204 
205   /** In-memory bucket size */
206   private float memoryFactor;
207 
208   /** Overhead of the structure itself */
209   private long overhead;
210 
211   /** Whether in-memory hfile's data block has higher priority when evicting */
212   private boolean forceInMemory;
213 
214   /** Where to send victims (blocks evicted/missing from the cache) */
215   private BlockCache victimHandler = null;
216 
217   /**
218    * Default constructor.  Specify maximum size and expected average block
219    * size (approximation is fine).
220    *
221    * <p>All other factors will be calculated based on defaults specified in
222    * this class.
223    * @param maxSize maximum size of cache, in bytes
224    * @param blockSize approximate size of each block, in bytes
225    */
226   public LruBlockCache(long maxSize, long blockSize) {
227     this(maxSize, blockSize, true);
228   }
229 
230   /**
231    * Constructor used for testing.  Allows disabling of the eviction thread.
232    */
233   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
234     this(maxSize, blockSize, evictionThread,
235         (int)Math.ceil(1.2*maxSize/blockSize),
236         DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
237         DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
238         DEFAULT_SINGLE_FACTOR,
239         DEFAULT_MULTI_FACTOR,
240         DEFAULT_MEMORY_FACTOR,
241         DEFAULT_HARD_CAPACITY_LIMIT_FACTOR,
242         false,
243         DEFAULT_MAX_BLOCK_SIZE
244         );
245   }
246 
247   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
248     this(maxSize, blockSize, evictionThread,
249         (int)Math.ceil(1.2*maxSize/blockSize),
250         DEFAULT_LOAD_FACTOR,
251         DEFAULT_CONCURRENCY_LEVEL,
252         conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
253         conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR),
254         conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR),
255         conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR),
256         conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR),
257         conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, DEFAULT_HARD_CAPACITY_LIMIT_FACTOR),
258         conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE),
259         conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE)
260         );
261   }
262 
263   public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
264     this(maxSize, blockSize, true, conf);
265   }
266 
267   /**
268    * Configurable constructor.  Use this constructor if not using defaults.
269    * @param maxSize maximum size of this cache, in bytes
270    * @param blockSize expected average size of blocks, in bytes
271    * @param evictionThread whether to run evictions in a bg thread or not
272    * @param mapInitialSize initial size of backing ConcurrentHashMap
273    * @param mapLoadFactor initial load factor of backing ConcurrentHashMap
274    * @param mapConcurrencyLevel initial concurrency factor for backing CHM
275    * @param minFactor percentage of total size that eviction will evict until
276    * @param acceptableFactor percentage of total size that triggers eviction
277    * @param singleFactor percentage of total size for single-access blocks
278    * @param multiFactor percentage of total size for multiple-access blocks
279    * @param memoryFactor percentage of total size for in-memory blocks
280    */
281   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
282       int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
283       float minFactor, float acceptableFactor, float singleFactor,
284       float multiFactor, float memoryFactor, float hardLimitFactor,
285       boolean forceInMemory, long maxBlockSize) {
286     this.maxBlockSize = maxBlockSize;
287     if(singleFactor + multiFactor + memoryFactor != 1 ||
288         singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) {
289       throw new IllegalArgumentException("Single, multi, and memory factors " +
290           " should be non-negative and total 1.0");
291     }
292     if(minFactor >= acceptableFactor) {
293       throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
294     }
295     if(minFactor >= 1.0f || acceptableFactor >= 1.0f) {
296       throw new IllegalArgumentException("all factors must be < 1");
297     }
298     this.maxSize = maxSize;
299     this.blockSize = blockSize;
300     this.forceInMemory = forceInMemory;
301     map = new ConcurrentHashMap<BlockCacheKey,LruCachedBlock>(mapInitialSize,
302         mapLoadFactor, mapConcurrencyLevel);
303     this.minFactor = minFactor;
304     this.acceptableFactor = acceptableFactor;
305     this.singleFactor = singleFactor;
306     this.multiFactor = multiFactor;
307     this.memoryFactor = memoryFactor;
308     this.stats = new CacheStats(this.getClass().getSimpleName());
309     this.count = new AtomicLong(0);
310     this.elements = new AtomicLong(0);
311     this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
312     this.size = new AtomicLong(this.overhead);
313     this.hardCapacityLimitFactor = hardLimitFactor;
314     if(evictionThread) {
315       this.evictionThread = new EvictionThread(this);
316       this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
317     } else {
318       this.evictionThread = null;
319     }
320     // TODO: Add means of turning this off.  Bit obnoxious running thread just to make a log
321     // every five minutes.
322     this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
323         statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
324   }
325 
326   @Override
327   public void setMaxSize(long maxSize) {
328     this.maxSize = maxSize;
329     if(this.size.get() > acceptableSize() && !evictionInProgress) {
330       runEviction();
331     }
332   }
333 
334   // BlockCache implementation
335 
336   /**
337    * Cache the block with the specified name and buffer.
338    * <p>
339    * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)
340    * this can happen, for which we compare the buffer contents.
341    * @param cacheKey block's cache key
342    * @param buf block buffer
343    * @param inMemory if block is in-memory
344    * @param cacheDataInL1
345    */
346   @Override
347   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
348       final boolean cacheDataInL1) {
349 
350     if (buf.heapSize() > maxBlockSize) {
351       // If there are a lot of blocks that are too
352       // big this can make the logs way too noisy.
353       // So we log 2%
354       if (stats.failInsert() % 50 == 0) {
355         LOG.warn("Trying to cache too large a block "
356             + cacheKey.getHfileName() + " @ "
357             + cacheKey.getOffset()
358             + " is " + buf.heapSize()
359             + " which is larger than " + maxBlockSize);
360       }
361       return;
362     }
363 
364     LruCachedBlock cb = map.get(cacheKey);
365     if (cb != null) {
366       // compare the contents, if they are not equal, we are in big trouble
367       if (compare(buf, cb.getBuffer()) != 0) {
368         throw new RuntimeException("Cached block contents differ, which should not have happened."
369           + "cacheKey:" + cacheKey);
370       }
371       String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey();
372       msg += ". This is harmless and can happen in rare cases (see HBASE-8547)";
373       LOG.debug(msg);
374       return;
375     }
376     long currentSize = size.get();
377     long currentAcceptableSize = acceptableSize();
378     long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize);
379     if (currentSize >= hardLimitSize) {
380       stats.failInsert();
381       if (LOG.isTraceEnabled()) {
382         LOG.trace("LruBlockCache current size " + StringUtils.byteDesc(currentSize)
383           + " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + "  too many."
384           + " the hard limit size is " + StringUtils.byteDesc(hardLimitSize) + ", failed to put cacheKey:"
385           + cacheKey + " into LruBlockCache.");
386       }
387       if (!evictionInProgress) {
388         runEviction();
389       }
390       return;
391     }
392     cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
393     long newSize = updateSizeMetrics(cb, false);
394     map.put(cacheKey, cb);
395     long val = elements.incrementAndGet();
396     if (LOG.isTraceEnabled()) {
397       long size = map.size();
398       assertCounterSanity(size, val);
399     }
400     if (newSize > currentAcceptableSize && !evictionInProgress) {
401       runEviction();
402     }
403   }
404 
405   /**
406    * Sanity-checking for parity between actual block cache content and metrics.
407    * Intended only for use with TRACE level logging and -ea JVM.
408    */
409   private static void assertCounterSanity(long mapSize, long counterVal) {
410     if (counterVal < 0) {
411       LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal +
412         ", mapSize=" + mapSize);
413       return;
414     }
415     if (mapSize < Integer.MAX_VALUE) {
416       double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.);
417       if (pct_diff > 0.05) {
418         LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal +
419           ", mapSize=" + mapSize);
420       }
421     }
422   }
423 
424   private int compare(Cacheable left, Cacheable right) {
425     ByteBuffer l = ByteBuffer.allocate(left.getSerializedLength());
426     left.serialize(l);
427     ByteBuffer r = ByteBuffer.allocate(right.getSerializedLength());
428     right.serialize(r);
429     return Bytes.compareTo(l.array(), l.arrayOffset(), l.limit(),
430       r.array(), r.arrayOffset(), r.limit());
431   }
432 
433   /**
434    * Cache the block with the specified name and buffer.
435    * <p>
436    * @param cacheKey block's cache key
437    * @param buf block buffer
438    */
439   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
440     cacheBlock(cacheKey, buf, false, false);
441   }
442 
443   /**
444    * Helper function that updates the local size counter and also updates any
445    * per-cf or per-blocktype metrics it can discern from given
446    * {@link LruCachedBlock}
447    *
448    * @param cb
449    * @param evict
450    */
451   protected long updateSizeMetrics(LruCachedBlock cb, boolean evict) {
452     long heapsize = cb.heapSize();
453     if (evict) {
454       heapsize *= -1;
455     }
456     return size.addAndGet(heapsize);
457   }
458 
459   /**
460    * Get the buffer of the block with the specified name.
461    * @param cacheKey block's cache key
462    * @param caching true if the caller caches blocks on cache misses
463    * @param repeat Whether this is a repeat lookup for the same block
464    *        (used to avoid double counting cache misses when doing double-check locking)
465    * @param updateCacheMetrics Whether to update cache metrics or not
466    * @return buffer of specified cache key, or null if not in cache
467    */
468   @Override
469   public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
470       boolean updateCacheMetrics) {
471     LruCachedBlock cb = map.get(cacheKey);
472     if (cb == null) {
473       if (!repeat && updateCacheMetrics) stats.miss(caching, cacheKey.isPrimary());
474       // If there is another block cache then try and read there.
475       // However if this is a retry ( second time in double checked locking )
476       // And it's already a miss then the l2 will also be a miss.
477       if (victimHandler != null && !repeat) {
478         Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
479 
480         // Promote this to L1.
481         if (result != null && caching) {
482           cacheBlock(cacheKey, result, /* inMemory = */ false, /* cacheData = */ true);
483         }
484         return result;
485       }
486       return null;
487     }
488     if (updateCacheMetrics) stats.hit(caching, cacheKey.isPrimary());
489     cb.access(count.incrementAndGet());
490     return cb.getBuffer();
491   }
492 
493   /**
494    * Whether the cache contains block with specified cacheKey
495    * @param cacheKey
496    * @return true if contains the block
497    */
498   public boolean containsBlock(BlockCacheKey cacheKey) {
499     return map.containsKey(cacheKey);
500   }
501 
502   @Override
503   public boolean evictBlock(BlockCacheKey cacheKey) {
504     LruCachedBlock cb = map.get(cacheKey);
505     if (cb == null) return false;
506     evictBlock(cb, false);
507     return true;
508   }
509 
510   /**
511    * Evicts all blocks for a specific HFile. This is an
512    * expensive operation implemented as a linear-time search through all blocks
513    * in the cache. Ideally this should be a search in a log-access-time map.
514    *
515    * <p>
516    * This is used for evict-on-close to remove all blocks of a specific HFile.
517    *
518    * @return the number of blocks evicted
519    */
520   @Override
521   public int evictBlocksByHfileName(String hfileName) {
522     int numEvicted = 0;
523     for (BlockCacheKey key : map.keySet()) {
524       if (key.getHfileName().equals(hfileName)) {
525         if (evictBlock(key))
526           ++numEvicted;
527       }
528     }
529     if (victimHandler != null) {
530       numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
531     }
532     return numEvicted;
533   }
534 
535   /**
536    * Evict the block, and it will be cached by the victim handler if exists &amp;&amp;
537    * block may be read again later
538    * @param block
539    * @param evictedByEvictionProcess true if the given block is evicted by
540    *          EvictionThread
541    * @return the heap size of evicted block
542    */
543   protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) {
544     map.remove(block.getCacheKey());
545     updateSizeMetrics(block, true);
546     long val = elements.decrementAndGet();
547     if (LOG.isTraceEnabled()) {
548       long size = map.size();
549       assertCounterSanity(size, val);
550     }
551     stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary());
552     if (evictedByEvictionProcess && victimHandler != null) {
553       if (victimHandler instanceof BucketCache) {
554         boolean wait = getCurrentSize() < acceptableSize();
555         boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
556         ((BucketCache)victimHandler).cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
557             inMemory, wait);
558       } else {
559         victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer());
560       }
561     }
562     return block.heapSize();
563   }
564 
565   /**
566    * Multi-threaded call to run the eviction process.
567    */
568   private void runEviction() {
569     if(evictionThread == null) {
570       evict();
571     } else {
572       evictionThread.evict();
573     }
574   }
575 
576   /**
577    * Eviction method.
578    */
579   void evict() {
580 
581     // Ensure only one eviction at a time
582     if(!evictionLock.tryLock()) return;
583 
584     try {
585       evictionInProgress = true;
586       long currentSize = this.size.get();
587       long bytesToFree = currentSize - minSize();
588 
589       if (LOG.isTraceEnabled()) {
590         LOG.trace("Block cache LRU eviction started; Attempting to free " +
591           StringUtils.byteDesc(bytesToFree) + " of total=" +
592           StringUtils.byteDesc(currentSize));
593       }
594 
595       if(bytesToFree <= 0) return;
596 
597       // Instantiate priority buckets
598       BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize,
599           singleSize());
600       BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize,
601           multiSize());
602       BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize,
603           memorySize());
604 
605       // Scan entire map putting into appropriate buckets
606       for(LruCachedBlock cachedBlock : map.values()) {
607         switch(cachedBlock.getPriority()) {
608           case SINGLE: {
609             bucketSingle.add(cachedBlock);
610             break;
611           }
612           case MULTI: {
613             bucketMulti.add(cachedBlock);
614             break;
615           }
616           case MEMORY: {
617             bucketMemory.add(cachedBlock);
618             break;
619           }
620         }
621       }
622 
623       long bytesFreed = 0;
624       if (forceInMemory || memoryFactor > 0.999f) {
625         long s = bucketSingle.totalSize();
626         long m = bucketMulti.totalSize();
627         if (bytesToFree > (s + m)) {
628           // this means we need to evict blocks in memory bucket to make room,
629           // so the single and multi buckets will be emptied
630           bytesFreed = bucketSingle.free(s);
631           bytesFreed += bucketMulti.free(m);
632           if (LOG.isTraceEnabled()) {
633             LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
634               " from single and multi buckets");
635           }
636           bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
637           if (LOG.isTraceEnabled()) {
638             LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
639               " total from all three buckets ");
640           }
641         } else {
642           // this means no need to evict block in memory bucket,
643           // and we try best to make the ratio between single-bucket and
644           // multi-bucket is 1:2
645           long bytesRemain = s + m - bytesToFree;
646           if (3 * s <= bytesRemain) {
647             // single-bucket is small enough that no eviction happens for it
648             // hence all eviction goes from multi-bucket
649             bytesFreed = bucketMulti.free(bytesToFree);
650           } else if (3 * m <= 2 * bytesRemain) {
651             // multi-bucket is small enough that no eviction happens for it
652             // hence all eviction goes from single-bucket
653             bytesFreed = bucketSingle.free(bytesToFree);
654           } else {
655             // both buckets need to evict some blocks
656             bytesFreed = bucketSingle.free(s - bytesRemain / 3);
657             if (bytesFreed < bytesToFree) {
658               bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
659             }
660           }
661         }
662       } else {
663         PriorityQueue<BlockBucket> bucketQueue =
664           new PriorityQueue<BlockBucket>(3);
665 
666         bucketQueue.add(bucketSingle);
667         bucketQueue.add(bucketMulti);
668         bucketQueue.add(bucketMemory);
669 
670         int remainingBuckets = 3;
671 
672         BlockBucket bucket;
673         while((bucket = bucketQueue.poll()) != null) {
674           long overflow = bucket.overflow();
675           if(overflow > 0) {
676             long bucketBytesToFree = Math.min(overflow,
677                 (bytesToFree - bytesFreed) / remainingBuckets);
678             bytesFreed += bucket.free(bucketBytesToFree);
679           }
680           remainingBuckets--;
681         }
682       }
683 
684       if (LOG.isTraceEnabled()) {
685         long single = bucketSingle.totalSize();
686         long multi = bucketMulti.totalSize();
687         long memory = bucketMemory.totalSize();
688         LOG.trace("Block cache LRU eviction completed; " +
689           "freed=" + StringUtils.byteDesc(bytesFreed) + ", " +
690           "total=" + StringUtils.byteDesc(this.size.get()) + ", " +
691           "single=" + StringUtils.byteDesc(single) + ", " +
692           "multi=" + StringUtils.byteDesc(multi) + ", " +
693           "memory=" + StringUtils.byteDesc(memory));
694       }
695     } finally {
696       stats.evict();
697       evictionInProgress = false;
698       evictionLock.unlock();
699     }
700   }
701 
702   @Override
703   public String toString() {
704     return Objects.toStringHelper(this)
705       .add("blockCount", getBlockCount())
706       .add("currentSize", getCurrentSize())
707       .add("freeSize", getFreeSize())
708       .add("maxSize", getMaxSize())
709       .add("heapSize", heapSize())
710       .add("minSize", minSize())
711       .add("minFactor", minFactor)
712       .add("multiSize", multiSize())
713       .add("multiFactor", multiFactor)
714       .add("singleSize", singleSize())
715       .add("singleFactor", singleFactor)
716       .toString();
717   }
718 
719   /**
720    * Used to group blocks into priority buckets.  There will be a BlockBucket
721    * for each priority (single, multi, memory).  Once bucketed, the eviction
722    * algorithm takes the appropriate number of elements out of each according
723    * to configuration parameters and their relatives sizes.
724    */
725   private class BlockBucket implements Comparable<BlockBucket> {
726 
727     private final String name;
728     private LruCachedBlockQueue queue;
729     private long totalSize = 0;
730     private long bucketSize;
731 
732     public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) {
733       this.name = name;
734       this.bucketSize = bucketSize;
735       queue = new LruCachedBlockQueue(bytesToFree, blockSize);
736       totalSize = 0;
737     }
738 
739     public void add(LruCachedBlock block) {
740       totalSize += block.heapSize();
741       queue.add(block);
742     }
743 
744     public long free(long toFree) {
745       if (LOG.isTraceEnabled()) {
746         LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this);
747       }
748       LruCachedBlock cb;
749       long freedBytes = 0;
750       while ((cb = queue.pollLast()) != null) {
751         freedBytes += evictBlock(cb, true);
752         if (freedBytes >= toFree) {
753           return freedBytes;
754         }
755       }
756       if (LOG.isTraceEnabled()) {
757         LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this);
758       }
759       return freedBytes;
760     }
761 
762     public long overflow() {
763       return totalSize - bucketSize;
764     }
765 
766     public long totalSize() {
767       return totalSize;
768     }
769 
770     public int compareTo(BlockBucket that) {
771       return Long.compare(this.overflow(), that.overflow());
772     }
773 
774     @Override
775     public boolean equals(Object that) {
776       if (that == null || !(that instanceof BlockBucket)){
777         return false;
778       }
779       return compareTo((BlockBucket)that) == 0;
780     }
781 
782     @Override
783     public int hashCode() {
784       return Objects.hashCode(name, bucketSize, queue, totalSize);
785     }
786 
787     @Override
788     public String toString() {
789       return Objects.toStringHelper(this)
790         .add("name", name)
791         .add("totalSize", StringUtils.byteDesc(totalSize))
792         .add("bucketSize", StringUtils.byteDesc(bucketSize))
793         .toString();
794     }
795   }
796 
797   /**
798    * Get the maximum size of this cache.
799    * @return max size in bytes
800    */
801   public long getMaxSize() {
802     return this.maxSize;
803   }
804 
805   @Override
806   public long getCurrentSize() {
807     return this.size.get();
808   }
809 
810   @Override
811   public long getFreeSize() {
812     return getMaxSize() - getCurrentSize();
813   }
814 
815   @Override
816   public long size() {
817     return getMaxSize();
818   }
819 
820   @Override
821   public long getBlockCount() {
822     return this.elements.get();
823   }
824 
825   EvictionThread getEvictionThread() {
826     return this.evictionThread;
827   }
828 
829   /*
830    * Eviction thread.  Sits in waiting state until an eviction is triggered
831    * when the cache size grows above the acceptable level.<p>
832    *
833    * Thread is triggered into action by {@link LruBlockCache#runEviction()}
834    */
835   static class EvictionThread extends HasThread {
836     private WeakReference<LruBlockCache> cache;
837     private volatile boolean go = true;
838     // flag set after enter the run method, used for test
839     private boolean enteringRun = false;
840 
841     public EvictionThread(LruBlockCache cache) {
842       super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread");
843       setDaemon(true);
844       this.cache = new WeakReference<LruBlockCache>(cache);
845     }
846 
847     @Override
848     public void run() {
849       enteringRun = true;
850       while (this.go) {
851         synchronized(this) {
852           try {
853             this.wait(1000 * 10/*Don't wait for ever*/);
854           } catch(InterruptedException e) {
855             LOG.warn("Interrupted eviction thread ", e);
856             Thread.currentThread().interrupt();
857           }
858         }
859         LruBlockCache cache = this.cache.get();
860         if (cache == null) break;
861         cache.evict();
862       }
863     }
864 
865     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
866         justification="This is what we want")
867     public void evict() {
868       synchronized(this) {
869         this.notifyAll();
870       }
871     }
872 
873     synchronized void shutdown() {
874       this.go = false;
875       this.notifyAll();
876     }
877 
878     /**
879      * Used for the test.
880      */
881     boolean isEnteringRun() {
882       return this.enteringRun;
883     }
884   }
885 
886   /*
887    * Statistics thread.  Periodically prints the cache statistics to the log.
888    */
889   static class StatisticsThread extends Thread {
890     private final LruBlockCache lru;
891 
892     public StatisticsThread(LruBlockCache lru) {
893       super("LruBlockCacheStats");
894       setDaemon(true);
895       this.lru = lru;
896     }
897 
898     @Override
899     public void run() {
900       lru.logStats();
901     }
902   }
903 
904   public void logStats() {
905     // Log size
906     long totalSize = heapSize();
907     long freeSize = maxSize - totalSize;
908     LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
909         "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
910         "max=" + StringUtils.byteDesc(this.maxSize) + ", " +
911         "blockCount=" + getBlockCount() + ", " +
912         "accesses=" + stats.getRequestCount() + ", " +
913         "hits=" + stats.getHitCount() + ", " +
914         "hitRatio=" + (stats.getHitCount() == 0 ?
915           "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
916         "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
917         "cachingHits=" + stats.getHitCachingCount() + ", " +
918         "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ?
919           "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
920         "evictions=" + stats.getEvictionCount() + ", " +
921         "evicted=" + stats.getEvictedCount() + ", " +
922         "evictedPerRun=" + stats.evictedPerEviction());
923   }
924 
925   /**
926    * Get counter statistics for this cache.
927    *
928    * <p>Includes: total accesses, hits, misses, evicted blocks, and runs
929    * of the eviction processes.
930    */
931   public CacheStats getStats() {
932     return this.stats;
933   }
934 
935   public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
936       (4 * Bytes.SIZEOF_LONG) + (9 * ClassSize.REFERENCE) +
937       (6 * Bytes.SIZEOF_FLOAT) + (2 * Bytes.SIZEOF_BOOLEAN)
938       + ClassSize.OBJECT);
939 
940   @Override
941   public long heapSize() {
942     return getCurrentSize();
943   }
944 
945   public static long calculateOverhead(long maxSize, long blockSize, int concurrency){
946     // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG
947     return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP +
948         ((long)Math.ceil(maxSize*1.2/blockSize)
949             * ClassSize.CONCURRENT_HASHMAP_ENTRY) +
950         ((long)concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
951   }
952 
953   @Override
954   public Iterator<CachedBlock> iterator() {
955     final Iterator<LruCachedBlock> iterator = map.values().iterator();
956 
957     return new Iterator<CachedBlock>() {
958       private final long now = System.nanoTime();
959 
960       @Override
961       public boolean hasNext() {
962         return iterator.hasNext();
963       }
964 
965       @Override
966       public CachedBlock next() {
967         final LruCachedBlock b = iterator.next();
968         return new CachedBlock() {
969           @Override
970           public String toString() {
971             return BlockCacheUtil.toString(this, now);
972           }
973 
974           @Override
975           public BlockPriority getBlockPriority() {
976             return b.getPriority();
977           }
978 
979           @Override
980           public BlockType getBlockType() {
981             return b.getBuffer().getBlockType();
982           }
983 
984           @Override
985           public long getOffset() {
986             return b.getCacheKey().getOffset();
987           }
988 
989           @Override
990           public long getSize() {
991             return b.getBuffer().heapSize();
992           }
993 
994           @Override
995           public long getCachedTime() {
996             return b.getCachedTime();
997           }
998 
999           @Override
1000           public String getFilename() {
1001             return b.getCacheKey().getHfileName();
1002           }
1003 
1004           @Override
1005           public int compareTo(CachedBlock other) {
1006             int diff = this.getFilename().compareTo(other.getFilename());
1007             if (diff != 0) return diff;
1008             diff = Long.compare(this.getOffset(), other.getOffset());
1009             if (diff != 0) return diff;
1010             if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
1011               throw new IllegalStateException("" + this.getCachedTime() + ", " +
1012                 other.getCachedTime());
1013             }
1014             return Long.compare(other.getCachedTime(), this.getCachedTime());
1015           }
1016 
1017           @Override
1018           public int hashCode() {
1019             return b.hashCode();
1020           }
1021 
1022           @Override
1023           public boolean equals(Object obj) {
1024             if (obj instanceof CachedBlock) {
1025               CachedBlock cb = (CachedBlock)obj;
1026               return compareTo(cb) == 0;
1027             } else {
1028               return false;
1029             }
1030           }
1031         };
1032       }
1033 
1034       @Override
1035       public void remove() {
1036         throw new UnsupportedOperationException();
1037       }
1038     };
1039   }
1040 
1041   // Simple calculators of sizes given factors and maxSize
1042 
1043   long acceptableSize() {
1044     return (long)Math.floor(this.maxSize * this.acceptableFactor);
1045   }
1046   private long minSize() {
1047     return (long)Math.floor(this.maxSize * this.minFactor);
1048   }
1049   private long singleSize() {
1050     return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor);
1051   }
1052   private long multiSize() {
1053     return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor);
1054   }
1055   private long memorySize() {
1056     return (long)Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
1057   }
1058 
1059   public void shutdown() {
1060     if (victimHandler != null)
1061       victimHandler.shutdown();
1062     this.scheduleThreadPool.shutdown();
1063     for (int i = 0; i < 10; i++) {
1064       if (!this.scheduleThreadPool.isShutdown()) {
1065         try {
1066           Thread.sleep(10);
1067         } catch (InterruptedException e) {
1068           LOG.warn("Interrupted while sleeping");
1069           Thread.currentThread().interrupt();
1070           break;
1071         }
1072       }
1073     }
1074 
1075     if (!this.scheduleThreadPool.isShutdown()) {
1076       List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
1077       LOG.debug("Still running " + runnables);
1078     }
1079     this.evictionThread.shutdown();
1080   }
1081 
1082   /** Clears the cache. Used in tests. */
1083   @VisibleForTesting
1084   public void clearCache() {
1085     this.map.clear();
1086     this.elements.set(0);
1087   }
1088 
1089   /**
1090    * Used in testing. May be very inefficient.
1091    * @return the set of cached file names
1092    */
1093   @VisibleForTesting
1094   SortedSet<String> getCachedFileNamesForTest() {
1095     SortedSet<String> fileNames = new TreeSet<String>();
1096     for (BlockCacheKey cacheKey : map.keySet()) {
1097       fileNames.add(cacheKey.getHfileName());
1098     }
1099     return fileNames;
1100   }
1101 
1102   @VisibleForTesting
1103   Map<BlockType, Integer> getBlockTypeCountsForTest() {
1104     Map<BlockType, Integer> counts =
1105         new EnumMap<BlockType, Integer>(BlockType.class);
1106     for (LruCachedBlock cb : map.values()) {
1107       BlockType blockType = ((Cacheable)cb.getBuffer()).getBlockType();
1108       Integer count = counts.get(blockType);
1109       counts.put(blockType, (count == null ? 0 : count) + 1);
1110     }
1111     return counts;
1112   }
1113 
1114   @VisibleForTesting
1115   public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
1116     Map<DataBlockEncoding, Integer> counts =
1117         new EnumMap<DataBlockEncoding, Integer>(DataBlockEncoding.class);
1118     for (LruCachedBlock block : map.values()) {
1119       DataBlockEncoding encoding =
1120               ((HFileBlock) block.getBuffer()).getDataBlockEncoding();
1121       Integer count = counts.get(encoding);
1122       counts.put(encoding, (count == null ? 0 : count) + 1);
1123     }
1124     return counts;
1125   }
1126 
1127   public void setVictimCache(BlockCache handler) {
1128     assert victimHandler == null;
1129     victimHandler = handler;
1130   }
1131 
1132   @VisibleForTesting
1133   Map<BlockCacheKey, LruCachedBlock> getMapForTests() {
1134     return map;
1135   }
1136 
1137   BlockCache getVictimHandler() {
1138     return this.victimHandler;
1139   }
1140 
1141   @Override
1142   public BlockCache[] getBlockCaches() {
1143     return null;
1144   }
1145 }