View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.lang.ref.WeakReference;
22  import java.nio.ByteBuffer;
23  import java.util.EnumMap;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.PriorityQueue;
28  import java.util.SortedSet;
29  import java.util.TreeSet;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.Executors;
32  import java.util.concurrent.ScheduledExecutorService;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicLong;
35  import java.util.concurrent.locks.ReentrantLock;
36  
37  import com.google.common.base.Objects;
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.hbase.io.HeapSize;
43  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
44  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.hbase.util.ClassSize;
47  import org.apache.hadoop.hbase.util.HasThread;
48  import org.apache.hadoop.util.StringUtils;
49  import org.codehaus.jackson.annotate.JsonIgnoreProperties;
50  
51  import com.google.common.annotations.VisibleForTesting;
52  import com.google.common.util.concurrent.ThreadFactoryBuilder;
53  
54  /**
55   * A block cache implementation that is memory-aware using {@link HeapSize},
56   * memory-bound using an LRU eviction algorithm, and concurrent: backed by a
57   * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving
58   * constant-time {@link #cacheBlock} and {@link #getBlock} operations.<p>
59   *
60   * Contains three levels of block priority to allow for scan-resistance and in-memory families 
61   * {@link org.apache.hadoop.hbase.HColumnDescriptor#setInMemory(boolean)} (An in-memory column 
62   * family is a column family that should be served from memory if possible):
63   * single-access, multiple-accesses, and in-memory priority.
64   * A block is added with an in-memory priority flag if
65   * {@link org.apache.hadoop.hbase.HColumnDescriptor#isInMemory()}, otherwise a block becomes a
66   *  single access priority the first time it is read into this block cache.  If a block is
67   *  accessed again while in cache, it is marked as a multiple access priority block.  This
68   *  delineation of blocks is used to prevent scans from thrashing the cache adding a 
69   *  least-frequently-used element to the eviction algorithm.<p>
70   *
71   * Each priority is given its own chunk of the total cache to ensure
72   * fairness during eviction.  Each priority will retain close to its maximum
73   * size, however, if any priority is not using its entire chunk the others
74   * are able to grow beyond their chunk size.<p>
75   *
76   * Instantiated at a minimum with the total size and average block size.
77   * All sizes are in bytes.  The block size is not especially important as this
78   * cache is fully dynamic in its sizing of blocks.  It is only used for
79   * pre-allocating data structures and in initial heap estimation of the map.<p>
80   *
81   * The detailed constructor defines the sizes for the three priorities (they
82   * should total to the <code>maximum size</code> defined).  It also sets the levels that
83   * trigger and control the eviction thread.<p>
84   *
85   * The <code>acceptable size</code> is the cache size level which triggers the eviction
86   * process to start.  It evicts enough blocks to get the size below the
87   * minimum size specified.<p>
88   *
89   * Eviction happens in a separate thread and involves a single full-scan
90   * of the map.  It determines how many bytes must be freed to reach the minimum
91   * size, and then while scanning determines the fewest least-recently-used
92   * blocks necessary from each of the three priorities (would be 3 times bytes
93   * to free).  It then uses the priority chunk sizes to evict fairly according
94   * to the relative sizes and usage.
95   */
96  @InterfaceAudience.Private
97  @JsonIgnoreProperties({"encodingCountsForTest"})
98  public class LruBlockCache implements ResizableBlockCache, HeapSize {
99  
100   static final Log LOG = LogFactory.getLog(LruBlockCache.class);
101 
102   /**
103    * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep
104    * evicting during an eviction run till the cache size is down to 80% of the total.
105    */
106   static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";
107 
108   /**
109    * Acceptable size of cache (no evictions if size < acceptable)
110    */
111   static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.acceptable.factor";
112 
113   static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.single.percentage";
114   static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.multi.percentage";
115   static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.memory.percentage";
116 
117   /**
118    * Configuration key to force data-block always (except in-memory are too much)
119    * cached in memory for in-memory hfile, unlike inMemory, which is a column-family
120    * configuration, inMemoryForceMode is a cluster-wide configuration
121    */
122   static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME = "hbase.lru.rs.inmemoryforcemode";
123 
124   /** Default Configuration Parameters*/
125 
126   /** Backing Concurrent Map Configuration */
127   static final float DEFAULT_LOAD_FACTOR = 0.75f;
128   static final int DEFAULT_CONCURRENCY_LEVEL = 16;
129 
130   /** Eviction thresholds */
131   static final float DEFAULT_MIN_FACTOR = 0.95f;
132   static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f;
133 
134   /** Priority buckets */
135   static final float DEFAULT_SINGLE_FACTOR = 0.25f;
136   static final float DEFAULT_MULTI_FACTOR = 0.50f;
137   static final float DEFAULT_MEMORY_FACTOR = 0.25f;
138 
139   static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false;
140 
141   /** Statistics thread */
142   static final int statThreadPeriod = 60 * 5;
143 
144   /** Concurrent map (the cache) */
145   private final Map<BlockCacheKey,LruCachedBlock> map;
146 
147   /** Eviction lock (locked when eviction in process) */
148   private final ReentrantLock evictionLock = new ReentrantLock(true);
149 
150   /** Volatile boolean to track if we are in an eviction process or not */
151   private volatile boolean evictionInProgress = false;
152 
153   /** Eviction thread */
154   private final EvictionThread evictionThread;
155 
156   /** Statistics thread schedule pool (for heavy debugging, could remove) */
157   private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
158     new ThreadFactoryBuilder().setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build());
159 
160   /** Current size of cache */
161   private final AtomicLong size;
162 
163   /** Current number of cached elements */
164   private final AtomicLong elements;
165 
166   /** Cache access count (sequential ID) */
167   private final AtomicLong count;
168 
169   /** Cache statistics */
170   private final CacheStats stats;
171 
172   /** Maximum allowable size of cache (block put if size > max, evict) */
173   private long maxSize;
174 
175   /** Approximate block size */
176   private long blockSize;
177 
178   /** Acceptable size of cache (no evictions if size < acceptable) */
179   private float acceptableFactor;
180 
181   /** Minimum threshold of cache (when evicting, evict until size < min) */
182   private float minFactor;
183 
184   /** Single access bucket size */
185   private float singleFactor;
186 
187   /** Multiple access bucket size */
188   private float multiFactor;
189 
190   /** In-memory bucket size */
191   private float memoryFactor;
192 
193   /** Overhead of the structure itself */
194   private long overhead;
195 
196   /** Whether in-memory hfile's data block has higher priority when evicting */
197   private boolean forceInMemory;
198 
199   /** Where to send victims (blocks evicted/missing from the cache) */
200   // TODO: Fix it so this is not explicit reference to a particular BlockCache implementation.
201   private BucketCache victimHandler = null;
202 
203   /**
204    * Default constructor.  Specify maximum size and expected average block
205    * size (approximation is fine).
206    *
207    * <p>All other factors will be calculated based on defaults specified in
208    * this class.
209    * @param maxSize maximum size of cache, in bytes
210    * @param blockSize approximate size of each block, in bytes
211    */
212   public LruBlockCache(long maxSize, long blockSize) {
213     this(maxSize, blockSize, true);
214   }
215 
216   /**
217    * Constructor used for testing.  Allows disabling of the eviction thread.
218    */
219   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
220     this(maxSize, blockSize, evictionThread,
221         (int)Math.ceil(1.2*maxSize/blockSize),
222         DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
223         DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
224         DEFAULT_SINGLE_FACTOR,
225         DEFAULT_MULTI_FACTOR,
226         DEFAULT_MEMORY_FACTOR,
227         false
228         );
229   }
230 
231   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
232     this(maxSize, blockSize, evictionThread,
233         (int)Math.ceil(1.2*maxSize/blockSize),
234         DEFAULT_LOAD_FACTOR,
235         DEFAULT_CONCURRENCY_LEVEL,
236         conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
237         conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR),
238         conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR),
239         conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR),
240         conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR),
241         conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE)
242         );
243   }
244 
245   public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
246     this(maxSize, blockSize, true, conf);
247   }
248 
249   /**
250    * Configurable constructor.  Use this constructor if not using defaults.
251    * @param maxSize maximum size of this cache, in bytes
252    * @param blockSize expected average size of blocks, in bytes
253    * @param evictionThread whether to run evictions in a bg thread or not
254    * @param mapInitialSize initial size of backing ConcurrentHashMap
255    * @param mapLoadFactor initial load factor of backing ConcurrentHashMap
256    * @param mapConcurrencyLevel initial concurrency factor for backing CHM
257    * @param minFactor percentage of total size that eviction will evict until
258    * @param acceptableFactor percentage of total size that triggers eviction
259    * @param singleFactor percentage of total size for single-access blocks
260    * @param multiFactor percentage of total size for multiple-access blocks
261    * @param memoryFactor percentage of total size for in-memory blocks
262    */
263   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
264       int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
265       float minFactor, float acceptableFactor, float singleFactor,
266       float multiFactor, float memoryFactor, boolean forceInMemory) {
267     if(singleFactor + multiFactor + memoryFactor != 1 ||
268         singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) {
269       throw new IllegalArgumentException("Single, multi, and memory factors " +
270           " should be non-negative and total 1.0");
271     }
272     if(minFactor >= acceptableFactor) {
273       throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
274     }
275     if(minFactor >= 1.0f || acceptableFactor >= 1.0f) {
276       throw new IllegalArgumentException("all factors must be < 1");
277     }
278     this.maxSize = maxSize;
279     this.blockSize = blockSize;
280     this.forceInMemory = forceInMemory;
281     map = new ConcurrentHashMap<BlockCacheKey,LruCachedBlock>(mapInitialSize,
282         mapLoadFactor, mapConcurrencyLevel);
283     this.minFactor = minFactor;
284     this.acceptableFactor = acceptableFactor;
285     this.singleFactor = singleFactor;
286     this.multiFactor = multiFactor;
287     this.memoryFactor = memoryFactor;
288     this.stats = new CacheStats(this.getClass().getSimpleName());
289     this.count = new AtomicLong(0);
290     this.elements = new AtomicLong(0);
291     this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
292     this.size = new AtomicLong(this.overhead);
293     if(evictionThread) {
294       this.evictionThread = new EvictionThread(this);
295       this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
296     } else {
297       this.evictionThread = null;
298     }
299     // TODO: Add means of turning this off.  Bit obnoxious running thread just to make a log
300     // every five minutes.
301     this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
302         statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
303   }
304 
305   @Override
306   public void setMaxSize(long maxSize) {
307     this.maxSize = maxSize;
308     if(this.size.get() > acceptableSize() && !evictionInProgress) {
309       runEviction();
310     }
311   }
312 
313   // BlockCache implementation
314 
315   /**
316    * Cache the block with the specified name and buffer.
317    * <p>
318    * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)
319    * this can happen, for which we compare the buffer contents.
320    * @param cacheKey block's cache key
321    * @param buf block buffer
322    * @param inMemory if block is in-memory
323    * @param cacheDataInL1
324    */
325   @Override
326   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
327       final boolean cacheDataInL1) {
328     LruCachedBlock cb = map.get(cacheKey);
329     if (cb != null) {
330       // compare the contents, if they are not equal, we are in big trouble
331       if (compare(buf, cb.getBuffer()) != 0) {
332         throw new RuntimeException("Cached block contents differ, which should not have happened."
333           + "cacheKey:" + cacheKey);
334       }
335       String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey();
336       msg += ". This is harmless and can happen in rare cases (see HBASE-8547)";
337       LOG.warn(msg);
338       return;
339     }
340     cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
341     long newSize = updateSizeMetrics(cb, false);
342     map.put(cacheKey, cb);
343     long val = elements.incrementAndGet();
344     if (LOG.isTraceEnabled()) {
345       long size = map.size();
346       assertCounterSanity(size, val);
347     }
348     if (newSize > acceptableSize() && !evictionInProgress) {
349       runEviction();
350     }
351   }
352 
353   /**
354    * Sanity-checking for parity between actual block cache content and metrics.
355    * Intended only for use with TRACE level logging and -ea JVM.
356    */
357   private static void assertCounterSanity(long mapSize, long counterVal) {
358     if (counterVal < 0) {
359       LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal +
360         ", mapSize=" + mapSize);
361       return;
362     }
363     if (mapSize < Integer.MAX_VALUE) {
364       double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.);
365       if (pct_diff > 0.05) {
366         LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal +
367           ", mapSize=" + mapSize);
368       }
369     }
370   }
371 
372   private int compare(Cacheable left, Cacheable right) {
373     ByteBuffer l = ByteBuffer.allocate(left.getSerializedLength());
374     left.serialize(l);
375     ByteBuffer r = ByteBuffer.allocate(right.getSerializedLength());
376     right.serialize(r);
377     return Bytes.compareTo(l.array(), l.arrayOffset(), l.limit(),
378       r.array(), r.arrayOffset(), r.limit());
379   }
380 
381   /**
382    * Cache the block with the specified name and buffer.
383    * <p>
384    * @param cacheKey block's cache key
385    * @param buf block buffer
386    */
387   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
388     cacheBlock(cacheKey, buf, false, false);
389   }
390 
391   /**
392    * Helper function that updates the local size counter and also updates any
393    * per-cf or per-blocktype metrics it can discern from given
394    * {@link LruCachedBlock}
395    *
396    * @param cb
397    * @param evict
398    */
399   protected long updateSizeMetrics(LruCachedBlock cb, boolean evict) {
400     long heapsize = cb.heapSize();
401     if (evict) {
402       heapsize *= -1;
403     }
404     return size.addAndGet(heapsize);
405   }
406 
407   /**
408    * Get the buffer of the block with the specified name.
409    * @param cacheKey block's cache key
410    * @param caching true if the caller caches blocks on cache misses
411    * @param repeat Whether this is a repeat lookup for the same block
412    *        (used to avoid double counting cache misses when doing double-check locking)
413    * @param updateCacheMetrics Whether to update cache metrics or not
414    * @return buffer of specified cache key, or null if not in cache
415    */
416   @Override
417   public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
418       boolean updateCacheMetrics) {
419     LruCachedBlock cb = map.get(cacheKey);
420     if (cb == null) {
421       if (!repeat && updateCacheMetrics) stats.miss(caching);
422       if (victimHandler != null) {
423         return victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
424       }
425       return null;
426     }
427     if (updateCacheMetrics) stats.hit(caching);
428     cb.access(count.incrementAndGet());
429     return cb.getBuffer();
430   }
431 
432   /**
433    * Whether the cache contains block with specified cacheKey
434    * @param cacheKey
435    * @return true if contains the block
436    */
437   public boolean containsBlock(BlockCacheKey cacheKey) {
438     return map.containsKey(cacheKey);
439   }
440 
441   @Override
442   public boolean evictBlock(BlockCacheKey cacheKey) {
443     LruCachedBlock cb = map.get(cacheKey);
444     if (cb == null) return false;
445     evictBlock(cb, false);
446     return true;
447   }
448 
449   /**
450    * Evicts all blocks for a specific HFile. This is an
451    * expensive operation implemented as a linear-time search through all blocks
452    * in the cache. Ideally this should be a search in a log-access-time map.
453    *
454    * <p>
455    * This is used for evict-on-close to remove all blocks of a specific HFile.
456    *
457    * @return the number of blocks evicted
458    */
459   @Override
460   public int evictBlocksByHfileName(String hfileName) {
461     int numEvicted = 0;
462     for (BlockCacheKey key : map.keySet()) {
463       if (key.getHfileName().equals(hfileName)) {
464         if (evictBlock(key))
465           ++numEvicted;
466       }
467     }
468     if (victimHandler != null) {
469       numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
470     }
471     return numEvicted;
472   }
473 
474   /**
475    * Evict the block, and it will be cached by the victim handler if exists &&
476    * block may be read again later
477    * @param block
478    * @param evictedByEvictionProcess true if the given block is evicted by
479    *          EvictionThread
480    * @return the heap size of evicted block
481    */
482   protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) {
483     map.remove(block.getCacheKey());
484     updateSizeMetrics(block, true);
485     long val = elements.decrementAndGet();
486     if (LOG.isTraceEnabled()) {
487       long size = map.size();
488       assertCounterSanity(size, val);
489     }
490     stats.evicted(block.getCachedTime());
491     if (evictedByEvictionProcess && victimHandler != null) {
492       boolean wait = getCurrentSize() < acceptableSize();
493       boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
494       victimHandler.cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
495           inMemory, wait);
496     }
497     return block.heapSize();
498   }
499 
500   /**
501    * Multi-threaded call to run the eviction process.
502    */
503   private void runEviction() {
504     if(evictionThread == null) {
505       evict();
506     } else {
507       evictionThread.evict();
508     }
509   }
510 
511   /**
512    * Eviction method.
513    */
514   void evict() {
515 
516     // Ensure only one eviction at a time
517     if(!evictionLock.tryLock()) return;
518 
519     try {
520       evictionInProgress = true;
521       long currentSize = this.size.get();
522       long bytesToFree = currentSize - minSize();
523 
524       if (LOG.isTraceEnabled()) {
525         LOG.trace("Block cache LRU eviction started; Attempting to free " +
526           StringUtils.byteDesc(bytesToFree) + " of total=" +
527           StringUtils.byteDesc(currentSize));
528       }
529 
530       if(bytesToFree <= 0) return;
531 
532       // Instantiate priority buckets
533       BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize,
534           singleSize());
535       BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize,
536           multiSize());
537       BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize,
538           memorySize());
539 
540       // Scan entire map putting into appropriate buckets
541       for(LruCachedBlock cachedBlock : map.values()) {
542         switch(cachedBlock.getPriority()) {
543           case SINGLE: {
544             bucketSingle.add(cachedBlock);
545             break;
546           }
547           case MULTI: {
548             bucketMulti.add(cachedBlock);
549             break;
550           }
551           case MEMORY: {
552             bucketMemory.add(cachedBlock);
553             break;
554           }
555         }
556       }
557 
558       long bytesFreed = 0;
559       if (forceInMemory || memoryFactor > 0.999f) {
560         long s = bucketSingle.totalSize();
561         long m = bucketMulti.totalSize();
562         if (bytesToFree > (s + m)) {
563           // this means we need to evict blocks in memory bucket to make room,
564           // so the single and multi buckets will be emptied
565           bytesFreed = bucketSingle.free(s);
566           bytesFreed += bucketMulti.free(m);
567           if (LOG.isTraceEnabled()) {
568             LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
569               " from single and multi buckets");
570           }
571           bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
572           if (LOG.isTraceEnabled()) {
573             LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
574               " total from all three buckets ");
575           }
576         } else {
577           // this means no need to evict block in memory bucket,
578           // and we try best to make the ratio between single-bucket and
579           // multi-bucket is 1:2
580           long bytesRemain = s + m - bytesToFree;
581           if (3 * s <= bytesRemain) {
582             // single-bucket is small enough that no eviction happens for it
583             // hence all eviction goes from multi-bucket
584             bytesFreed = bucketMulti.free(bytesToFree);
585           } else if (3 * m <= 2 * bytesRemain) {
586             // multi-bucket is small enough that no eviction happens for it
587             // hence all eviction goes from single-bucket
588             bytesFreed = bucketSingle.free(bytesToFree);
589           } else {
590             // both buckets need to evict some blocks
591             bytesFreed = bucketSingle.free(s - bytesRemain / 3);
592             if (bytesFreed < bytesToFree) {
593               bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
594             }
595           }
596         }
597       } else {
598         PriorityQueue<BlockBucket> bucketQueue =
599           new PriorityQueue<BlockBucket>(3);
600 
601         bucketQueue.add(bucketSingle);
602         bucketQueue.add(bucketMulti);
603         bucketQueue.add(bucketMemory);
604 
605         int remainingBuckets = 3;
606 
607         BlockBucket bucket;
608         while((bucket = bucketQueue.poll()) != null) {
609           long overflow = bucket.overflow();
610           if(overflow > 0) {
611             long bucketBytesToFree = Math.min(overflow,
612                 (bytesToFree - bytesFreed) / remainingBuckets);
613             bytesFreed += bucket.free(bucketBytesToFree);
614           }
615           remainingBuckets--;
616         }
617       }
618 
619       if (LOG.isTraceEnabled()) {
620         long single = bucketSingle.totalSize();
621         long multi = bucketMulti.totalSize();
622         long memory = bucketMemory.totalSize();
623         LOG.trace("Block cache LRU eviction completed; " +
624           "freed=" + StringUtils.byteDesc(bytesFreed) + ", " +
625           "total=" + StringUtils.byteDesc(this.size.get()) + ", " +
626           "single=" + StringUtils.byteDesc(single) + ", " +
627           "multi=" + StringUtils.byteDesc(multi) + ", " +
628           "memory=" + StringUtils.byteDesc(memory));
629       }
630     } finally {
631       stats.evict();
632       evictionInProgress = false;
633       evictionLock.unlock();
634     }
635   }
636 
637   @Override
638   public String toString() {
639     return Objects.toStringHelper(this)
640       .add("blockCount", getBlockCount())
641       .add("currentSize", getCurrentSize())
642       .add("freeSize", getFreeSize())
643       .add("maxSize", getMaxSize())
644       .add("heapSize", heapSize())
645       .add("minSize", minSize())
646       .add("minFactor", minFactor)
647       .add("multiSize", multiSize())
648       .add("multiFactor", multiFactor)
649       .add("singleSize", singleSize())
650       .add("singleFactor", singleFactor)
651       .toString();
652   }
653 
654   /**
655    * Used to group blocks into priority buckets.  There will be a BlockBucket
656    * for each priority (single, multi, memory).  Once bucketed, the eviction
657    * algorithm takes the appropriate number of elements out of each according
658    * to configuration parameters and their relatives sizes.
659    */
660   private class BlockBucket implements Comparable<BlockBucket> {
661 
662     private final String name;
663     private LruCachedBlockQueue queue;
664     private long totalSize = 0;
665     private long bucketSize;
666 
667     public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) {
668       this.name = name;
669       this.bucketSize = bucketSize;
670       queue = new LruCachedBlockQueue(bytesToFree, blockSize);
671       totalSize = 0;
672     }
673 
674     public void add(LruCachedBlock block) {
675       totalSize += block.heapSize();
676       queue.add(block);
677     }
678 
679     public long free(long toFree) {
680       if (LOG.isTraceEnabled()) {
681         LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this);
682       }
683       LruCachedBlock cb;
684       long freedBytes = 0;
685       while ((cb = queue.pollLast()) != null) {
686         freedBytes += evictBlock(cb, true);
687         if (freedBytes >= toFree) {
688           return freedBytes;
689         }
690       }
691       if (LOG.isTraceEnabled()) {
692         LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this);
693       }
694       return freedBytes;
695     }
696 
697     public long overflow() {
698       return totalSize - bucketSize;
699     }
700 
701     public long totalSize() {
702       return totalSize;
703     }
704 
705     public int compareTo(BlockBucket that) {
706       if(this.overflow() == that.overflow()) return 0;
707       return this.overflow() > that.overflow() ? 1 : -1;
708     }
709 
710     @Override
711     public boolean equals(Object that) {
712       if (that == null || !(that instanceof BlockBucket)){
713         return false;
714       }
715       return compareTo((BlockBucket)that) == 0;
716     }
717 
718     @Override
719     public int hashCode() {
720       return Objects.hashCode(name, bucketSize, queue, totalSize);
721     }
722 
723     @Override
724     public String toString() {
725       return Objects.toStringHelper(this)
726         .add("name", name)
727         .add("totalSize", StringUtils.byteDesc(totalSize))
728         .add("bucketSize", StringUtils.byteDesc(bucketSize))
729         .toString();
730     }
731   }
732 
733   /**
734    * Get the maximum size of this cache.
735    * @return max size in bytes
736    */
737   public long getMaxSize() {
738     return this.maxSize;
739   }
740 
741   @Override
742   public long getCurrentSize() {
743     return this.size.get();
744   }
745 
746   @Override
747   public long getFreeSize() {
748     return getMaxSize() - getCurrentSize();
749   }
750 
751   @Override
752   public long size() {
753     return getMaxSize();
754   }
755 
756   @Override
757   public long getBlockCount() {
758     return this.elements.get();
759   }
760 
761   EvictionThread getEvictionThread() {
762     return this.evictionThread;
763   }
764 
765   /*
766    * Eviction thread.  Sits in waiting state until an eviction is triggered
767    * when the cache size grows above the acceptable level.<p>
768    *
769    * Thread is triggered into action by {@link LruBlockCache#runEviction()}
770    */
771   static class EvictionThread extends HasThread {
772     private WeakReference<LruBlockCache> cache;
773     private volatile boolean go = true;
774     // flag set after enter the run method, used for test
775     private boolean enteringRun = false;
776 
777     public EvictionThread(LruBlockCache cache) {
778       super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread");
779       setDaemon(true);
780       this.cache = new WeakReference<LruBlockCache>(cache);
781     }
782 
783     @Override
784     public void run() {
785       enteringRun = true;
786       while (this.go) {
787         synchronized(this) {
788           try {
789             this.wait(1000 * 10/*Don't wait for ever*/);
790           } catch(InterruptedException e) {}
791         }
792         LruBlockCache cache = this.cache.get();
793         if (cache == null) break;
794         cache.evict();
795       }
796     }
797 
798     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
799         justification="This is what we want")
800     public void evict() {
801       synchronized(this) {
802         this.notifyAll();
803       }
804     }
805 
806     synchronized void shutdown() {
807       this.go = false;
808       this.notifyAll();
809     }
810 
811     /**
812      * Used for the test.
813      */
814     boolean isEnteringRun() {
815       return this.enteringRun;
816     }
817   }
818 
819   /*
820    * Statistics thread.  Periodically prints the cache statistics to the log.
821    */
822   static class StatisticsThread extends Thread {
823     private final LruBlockCache lru;
824 
825     public StatisticsThread(LruBlockCache lru) {
826       super("LruBlockCacheStats");
827       setDaemon(true);
828       this.lru = lru;
829     }
830 
831     @Override
832     public void run() {
833       lru.logStats();
834     }
835   }
836 
837   public void logStats() {
838     // Log size
839     long totalSize = heapSize();
840     long freeSize = maxSize - totalSize;
841     LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
842         "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
843         "max=" + StringUtils.byteDesc(this.maxSize) + ", " +
844         "blockCount=" + getBlockCount() + ", " +
845         "accesses=" + stats.getRequestCount() + ", " +
846         "hits=" + stats.getHitCount() + ", " +
847         "hitRatio=" + (stats.getHitCount() == 0 ?
848           "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
849         "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
850         "cachingHits=" + stats.getHitCachingCount() + ", " +
851         "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ?
852           "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
853         "evictions=" + stats.getEvictionCount() + ", " +
854         "evicted=" + stats.getEvictedCount() + ", " +
855         "evictedPerRun=" + stats.evictedPerEviction());
856   }
857 
858   /**
859    * Get counter statistics for this cache.
860    *
861    * <p>Includes: total accesses, hits, misses, evicted blocks, and runs
862    * of the eviction processes.
863    */
864   public CacheStats getStats() {
865     return this.stats;
866   }
867 
868   public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
869       (3 * Bytes.SIZEOF_LONG) + (9 * ClassSize.REFERENCE) +
870       (5 * Bytes.SIZEOF_FLOAT) + Bytes.SIZEOF_BOOLEAN
871       + ClassSize.OBJECT);
872 
873   @Override
874   public long heapSize() {
875     return getCurrentSize();
876   }
877 
878   public static long calculateOverhead(long maxSize, long blockSize, int concurrency){
879     // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG
880     return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP +
881         ((long)Math.ceil(maxSize*1.2/blockSize)
882             * ClassSize.CONCURRENT_HASHMAP_ENTRY) +
883         ((long)concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
884   }
885 
886   @Override
887   public Iterator<CachedBlock> iterator() {
888     final Iterator<LruCachedBlock> iterator = map.values().iterator();
889 
890     return new Iterator<CachedBlock>() {
891       private final long now = System.nanoTime();
892 
893       @Override
894       public boolean hasNext() {
895         return iterator.hasNext();
896       }
897 
898       @Override
899       public CachedBlock next() {
900         final LruCachedBlock b = iterator.next();
901         return new CachedBlock() {
902           @Override
903           public String toString() {
904             return BlockCacheUtil.toString(this, now);
905           }
906 
907           @Override
908           public BlockPriority getBlockPriority() {
909             return b.getPriority();
910           }
911 
912           @Override
913           public BlockType getBlockType() {
914             return b.getBuffer().getBlockType();
915           }
916 
917           @Override
918           public long getOffset() {
919             return b.getCacheKey().getOffset();
920           }
921 
922           @Override
923           public long getSize() {
924             return b.getBuffer().heapSize();
925           }
926 
927           @Override
928           public long getCachedTime() {
929             return b.getCachedTime();
930           }
931 
932           @Override
933           public String getFilename() {
934             return b.getCacheKey().getHfileName();
935           }
936 
937           @Override
938           public int compareTo(CachedBlock other) {
939             int diff = this.getFilename().compareTo(other.getFilename());
940             if (diff != 0) return diff;
941             diff = (int)(this.getOffset() - other.getOffset());
942             if (diff != 0) return diff;
943             if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
944               throw new IllegalStateException("" + this.getCachedTime() + ", " +
945                 other.getCachedTime());
946             }
947             return (int)(other.getCachedTime() - this.getCachedTime());
948           }
949 
950           @Override
951           public int hashCode() {
952             return b.hashCode();
953           }
954 
955           @Override
956           public boolean equals(Object obj) {
957             if (obj instanceof CachedBlock) {
958               CachedBlock cb = (CachedBlock)obj;
959               return compareTo(cb) == 0;
960             } else {
961               return false;
962             }
963           }
964         };
965       }
966 
967       @Override
968       public void remove() {
969         throw new UnsupportedOperationException();
970       }
971     };
972   }
973 
974   // Simple calculators of sizes given factors and maxSize
975 
976   long acceptableSize() {
977     return (long)Math.floor(this.maxSize * this.acceptableFactor);
978   }
979   private long minSize() {
980     return (long)Math.floor(this.maxSize * this.minFactor);
981   }
982   private long singleSize() {
983     return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor);
984   }
985   private long multiSize() {
986     return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor);
987   }
988   private long memorySize() {
989     return (long)Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
990   }
991 
992   public void shutdown() {
993     if (victimHandler != null)
994       victimHandler.shutdown();
995     this.scheduleThreadPool.shutdown();
996     for (int i = 0; i < 10; i++) {
997       if (!this.scheduleThreadPool.isShutdown()) {
998         try {
999           Thread.sleep(10);
1000         } catch (InterruptedException e) {
1001           LOG.warn("Interrupted while sleeping");
1002           Thread.currentThread().interrupt();
1003           break;
1004         }
1005       }
1006     }
1007 
1008     if (!this.scheduleThreadPool.isShutdown()) {
1009       List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
1010       LOG.debug("Still running " + runnables);
1011     }
1012     this.evictionThread.shutdown();
1013   }
1014 
1015   /** Clears the cache. Used in tests. */
1016   @VisibleForTesting
1017   public void clearCache() {
1018     this.map.clear();
1019     this.elements.set(0);
1020   }
1021 
1022   /**
1023    * Used in testing. May be very inefficient.
1024    * @return the set of cached file names
1025    */
1026   @VisibleForTesting
1027   SortedSet<String> getCachedFileNamesForTest() {
1028     SortedSet<String> fileNames = new TreeSet<String>();
1029     for (BlockCacheKey cacheKey : map.keySet()) {
1030       fileNames.add(cacheKey.getHfileName());
1031     }
1032     return fileNames;
1033   }
1034 
1035   @VisibleForTesting
1036   Map<BlockType, Integer> getBlockTypeCountsForTest() {
1037     Map<BlockType, Integer> counts =
1038         new EnumMap<BlockType, Integer>(BlockType.class);
1039     for (LruCachedBlock cb : map.values()) {
1040       BlockType blockType = ((Cacheable)cb.getBuffer()).getBlockType();
1041       Integer count = counts.get(blockType);
1042       counts.put(blockType, (count == null ? 0 : count) + 1);
1043     }
1044     return counts;
1045   }
1046 
1047   @VisibleForTesting
1048   public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
1049     Map<DataBlockEncoding, Integer> counts =
1050         new EnumMap<DataBlockEncoding, Integer>(DataBlockEncoding.class);
1051     for (LruCachedBlock block : map.values()) {
1052       DataBlockEncoding encoding =
1053               ((HFileBlock) block.getBuffer()).getDataBlockEncoding();
1054       Integer count = counts.get(encoding);
1055       counts.put(encoding, (count == null ? 0 : count) + 1);
1056     }
1057     return counts;
1058   }
1059 
1060   public void setVictimCache(BucketCache handler) {
1061     assert victimHandler == null;
1062     victimHandler = handler;
1063   }
1064 
1065   @VisibleForTesting
1066   Map<BlockCacheKey, LruCachedBlock> getMapForTests() {
1067     return map;
1068   }
1069 
1070   BucketCache getVictimHandler() {
1071     return this.victimHandler;
1072   }
1073 
1074   @Override
1075   public BlockCache[] getBlockCaches() {
1076     return null;
1077   }
1078 }