View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.lang.ref.WeakReference;
22  import java.nio.ByteBuffer;
23  import java.util.EnumMap;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.PriorityQueue;
28  import java.util.SortedSet;
29  import java.util.TreeSet;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.Executors;
32  import java.util.concurrent.ScheduledExecutorService;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicLong;
35  import java.util.concurrent.locks.ReentrantLock;
36  
37  import com.google.common.base.Objects;
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.hbase.io.HeapSize;
43  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
44  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.hbase.util.ClassSize;
47  import org.apache.hadoop.hbase.util.HasThread;
48  import org.apache.hadoop.util.StringUtils;
49  import org.codehaus.jackson.annotate.JsonIgnoreProperties;
50  
51  import com.google.common.annotations.VisibleForTesting;
52  import com.google.common.util.concurrent.ThreadFactoryBuilder;
53  
54  /**
55   * A block cache implementation that is memory-aware using {@link HeapSize},
56   * memory-bound using an LRU eviction algorithm, and concurrent: backed by a
57   * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving
58   * constant-time {@link #cacheBlock} and {@link #getBlock} operations.<p>
59   *
60   * Contains three levels of block priority to allow for
61   * scan-resistance and in-memory families 
62   * {@link org.apache.hadoop.hbase.HColumnDescriptor#setInMemory(boolean)} (An
63   * in-memory column family is a column family that should be served from memory if possible):
64   * single-access, multiple-accesses, and in-memory priority.
65   * A block is added with an in-memory priority flag if
66   * {@link org.apache.hadoop.hbase.HColumnDescriptor#isInMemory()}, 
67   * otherwise a block becomes a single access
68   * priority the first time it is read into this block cache.  If a block is accessed again while
69   * in cache, it is marked as a multiple access priority block.  This delineation of blocks is used
70   * to prevent scans from thrashing the cache adding a least-frequently-used
71   * element to the eviction algorithm.<p>
72   *
73   * Each priority is given its own chunk of the total cache to ensure
74   * fairness during eviction.  Each priority will retain close to its maximum
75   * size, however, if any priority is not using its entire chunk the others
76   * are able to grow beyond their chunk size.<p>
77   *
78   * Instantiated at a minimum with the total size and average block size.
79   * All sizes are in bytes.  The block size is not especially important as this
80   * cache is fully dynamic in its sizing of blocks.  It is only used for
81   * pre-allocating data structures and in initial heap estimation of the map.<p>
82   *
83   * The detailed constructor defines the sizes for the three priorities (they
84   * should total to the <code>maximum size</code> defined).  It also sets the levels that
85   * trigger and control the eviction thread.<p>
86   *
87   * The <code>acceptable size</code> is the cache size level which triggers the eviction
88   * process to start.  It evicts enough blocks to get the size below the
89   * minimum size specified.<p>
90   *
91   * Eviction happens in a separate thread and involves a single full-scan
92   * of the map.  It determines how many bytes must be freed to reach the minimum
93   * size, and then while scanning determines the fewest least-recently-used
94   * blocks necessary from each of the three priorities (would be 3 times bytes
95   * to free).  It then uses the priority chunk sizes to evict fairly according
96   * to the relative sizes and usage.
97   */
98  @InterfaceAudience.Private
99  @JsonIgnoreProperties({"encodingCountsForTest"})
100 public class LruBlockCache implements ResizableBlockCache, HeapSize {
101 
102   static final Log LOG = LogFactory.getLog(LruBlockCache.class);
103 
104   /**
105    * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep
106    * evicting during an eviction run till the cache size is down to 80% of the total.
107    */
108   static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";
109 
110   /**
111    * Acceptable size of cache (no evictions if size < acceptable)
112    */
113   static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.acceptable.factor";
114 
115   static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.single.percentage";
116   static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.multi.percentage";
117   static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.memory.percentage";
118 
119   /**
120    * Configuration key to force data-block always (except in-memory are too much)
121    * cached in memory for in-memory hfile, unlike inMemory, which is a column-family
122    * configuration, inMemoryForceMode is a cluster-wide configuration
123    */
124   static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME = "hbase.lru.rs.inmemoryforcemode";
125 
126   /** Default Configuration Parameters*/
127 
128   /** Backing Concurrent Map Configuration */
129   static final float DEFAULT_LOAD_FACTOR = 0.75f;
130   static final int DEFAULT_CONCURRENCY_LEVEL = 16;
131 
132   /** Eviction thresholds */
133   static final float DEFAULT_MIN_FACTOR = 0.95f;
134   static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f;
135 
136   /** Priority buckets */
137   static final float DEFAULT_SINGLE_FACTOR = 0.25f;
138   static final float DEFAULT_MULTI_FACTOR = 0.50f;
139   static final float DEFAULT_MEMORY_FACTOR = 0.25f;
140 
141   static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false;
142 
143   /** Statistics thread */
144   static final int statThreadPeriod = 60 * 5;
145 
146   /** Concurrent map (the cache) */
147   private final Map<BlockCacheKey,LruCachedBlock> map;
148 
149   /** Eviction lock (locked when eviction in process) */
150   private final ReentrantLock evictionLock = new ReentrantLock(true);
151 
152   /** Volatile boolean to track if we are in an eviction process or not */
153   private volatile boolean evictionInProgress = false;
154 
155   /** Eviction thread */
156   private final EvictionThread evictionThread;
157 
158   /** Statistics thread schedule pool (for heavy debugging, could remove) */
159   private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
160     new ThreadFactoryBuilder().setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build());
161 
162   /** Current size of cache */
163   private final AtomicLong size;
164 
165   /** Current number of cached elements */
166   private final AtomicLong elements;
167 
168   /** Cache access count (sequential ID) */
169   private final AtomicLong count;
170 
171   /** Cache statistics */
172   private final CacheStats stats;
173 
174   /** Maximum allowable size of cache (block put if size > max, evict) */
175   private long maxSize;
176 
177   /** Approximate block size */
178   private long blockSize;
179 
180   /** Acceptable size of cache (no evictions if size < acceptable) */
181   private float acceptableFactor;
182 
183   /** Minimum threshold of cache (when evicting, evict until size < min) */
184   private float minFactor;
185 
186   /** Single access bucket size */
187   private float singleFactor;
188 
189   /** Multiple access bucket size */
190   private float multiFactor;
191 
192   /** In-memory bucket size */
193   private float memoryFactor;
194 
195   /** Overhead of the structure itself */
196   private long overhead;
197 
198   /** Whether in-memory hfile's data block has higher priority when evicting */
199   private boolean forceInMemory;
200 
201   /** Where to send victims (blocks evicted/missing from the cache) */
202   private BlockCache victimHandler = null;
203 
204   /**
205    * Default constructor.  Specify maximum size and expected average block
206    * size (approximation is fine).
207    *
208    * <p>All other factors will be calculated based on defaults specified in
209    * this class.
210    * @param maxSize maximum size of cache, in bytes
211    * @param blockSize approximate size of each block, in bytes
212    */
213   public LruBlockCache(long maxSize, long blockSize) {
214     this(maxSize, blockSize, true);
215   }
216 
217   /**
218    * Constructor used for testing.  Allows disabling of the eviction thread.
219    */
220   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
221     this(maxSize, blockSize, evictionThread,
222         (int)Math.ceil(1.2*maxSize/blockSize),
223         DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
224         DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
225         DEFAULT_SINGLE_FACTOR,
226         DEFAULT_MULTI_FACTOR,
227         DEFAULT_MEMORY_FACTOR,
228         false
229         );
230   }
231 
232   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
233     this(maxSize, blockSize, evictionThread,
234         (int)Math.ceil(1.2*maxSize/blockSize),
235         DEFAULT_LOAD_FACTOR,
236         DEFAULT_CONCURRENCY_LEVEL,
237         conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
238         conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR),
239         conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR),
240         conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR),
241         conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR),
242         conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE)
243         );
244   }
245 
246   public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
247     this(maxSize, blockSize, true, conf);
248   }
249 
250   /**
251    * Configurable constructor.  Use this constructor if not using defaults.
252    * @param maxSize maximum size of this cache, in bytes
253    * @param blockSize expected average size of blocks, in bytes
254    * @param evictionThread whether to run evictions in a bg thread or not
255    * @param mapInitialSize initial size of backing ConcurrentHashMap
256    * @param mapLoadFactor initial load factor of backing ConcurrentHashMap
257    * @param mapConcurrencyLevel initial concurrency factor for backing CHM
258    * @param minFactor percentage of total size that eviction will evict until
259    * @param acceptableFactor percentage of total size that triggers eviction
260    * @param singleFactor percentage of total size for single-access blocks
261    * @param multiFactor percentage of total size for multiple-access blocks
262    * @param memoryFactor percentage of total size for in-memory blocks
263    */
264   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
265       int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
266       float minFactor, float acceptableFactor, float singleFactor,
267       float multiFactor, float memoryFactor, boolean forceInMemory) {
268     if(singleFactor + multiFactor + memoryFactor != 1 ||
269         singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) {
270       throw new IllegalArgumentException("Single, multi, and memory factors " +
271           " should be non-negative and total 1.0");
272     }
273     if(minFactor >= acceptableFactor) {
274       throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
275     }
276     if(minFactor >= 1.0f || acceptableFactor >= 1.0f) {
277       throw new IllegalArgumentException("all factors must be < 1");
278     }
279     this.maxSize = maxSize;
280     this.blockSize = blockSize;
281     this.forceInMemory = forceInMemory;
282     map = new ConcurrentHashMap<BlockCacheKey,LruCachedBlock>(mapInitialSize,
283         mapLoadFactor, mapConcurrencyLevel);
284     this.minFactor = minFactor;
285     this.acceptableFactor = acceptableFactor;
286     this.singleFactor = singleFactor;
287     this.multiFactor = multiFactor;
288     this.memoryFactor = memoryFactor;
289     this.stats = new CacheStats(this.getClass().getSimpleName());
290     this.count = new AtomicLong(0);
291     this.elements = new AtomicLong(0);
292     this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
293     this.size = new AtomicLong(this.overhead);
294     if(evictionThread) {
295       this.evictionThread = new EvictionThread(this);
296       this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
297     } else {
298       this.evictionThread = null;
299     }
300     // TODO: Add means of turning this off.  Bit obnoxious running thread just to make a log
301     // every five minutes.
302     this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
303         statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
304   }
305 
306   @Override
307   public void setMaxSize(long maxSize) {
308     this.maxSize = maxSize;
309     if(this.size.get() > acceptableSize() && !evictionInProgress) {
310       runEviction();
311     }
312   }
313 
314   // BlockCache implementation
315 
316   /**
317    * Cache the block with the specified name and buffer.
318    * <p>
319    * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)
320    * this can happen, for which we compare the buffer contents.
321    * @param cacheKey block's cache key
322    * @param buf block buffer
323    * @param inMemory if block is in-memory
324    * @param cacheDataInL1
325    */
326   @Override
327   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
328       final boolean cacheDataInL1) {
329     LruCachedBlock cb = map.get(cacheKey);
330     if (cb != null) {
331       // compare the contents, if they are not equal, we are in big trouble
332       if (compare(buf, cb.getBuffer()) != 0) {
333         throw new RuntimeException("Cached block contents differ, which should not have happened."
334           + "cacheKey:" + cacheKey);
335       }
336       String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey();
337       msg += ". This is harmless and can happen in rare cases (see HBASE-8547)";
338       LOG.warn(msg);
339       return;
340     }
341     cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
342     long newSize = updateSizeMetrics(cb, false);
343     map.put(cacheKey, cb);
344     long val = elements.incrementAndGet();
345     if (LOG.isTraceEnabled()) {
346       long size = map.size();
347       assertCounterSanity(size, val);
348     }
349     if (newSize > acceptableSize() && !evictionInProgress) {
350       runEviction();
351     }
352   }
353 
354   /**
355    * Sanity-checking for parity between actual block cache content and metrics.
356    * Intended only for use with TRACE level logging and -ea JVM.
357    */
358   private static void assertCounterSanity(long mapSize, long counterVal) {
359     if (counterVal < 0) {
360       LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal +
361         ", mapSize=" + mapSize);
362       return;
363     }
364     if (mapSize < Integer.MAX_VALUE) {
365       double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.);
366       if (pct_diff > 0.05) {
367         LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal +
368           ", mapSize=" + mapSize);
369       }
370     }
371   }
372 
373   private int compare(Cacheable left, Cacheable right) {
374     ByteBuffer l = ByteBuffer.allocate(left.getSerializedLength());
375     left.serialize(l);
376     ByteBuffer r = ByteBuffer.allocate(right.getSerializedLength());
377     right.serialize(r);
378     return Bytes.compareTo(l.array(), l.arrayOffset(), l.limit(),
379       r.array(), r.arrayOffset(), r.limit());
380   }
381 
382   /**
383    * Cache the block with the specified name and buffer.
384    * <p>
385    * @param cacheKey block's cache key
386    * @param buf block buffer
387    */
388   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
389     cacheBlock(cacheKey, buf, false, false);
390   }
391 
392   /**
393    * Helper function that updates the local size counter and also updates any
394    * per-cf or per-blocktype metrics it can discern from given
395    * {@link LruCachedBlock}
396    *
397    * @param cb
398    * @param evict
399    */
400   protected long updateSizeMetrics(LruCachedBlock cb, boolean evict) {
401     long heapsize = cb.heapSize();
402     if (evict) {
403       heapsize *= -1;
404     }
405     return size.addAndGet(heapsize);
406   }
407 
408   /**
409    * Get the buffer of the block with the specified name.
410    * @param cacheKey block's cache key
411    * @param caching true if the caller caches blocks on cache misses
412    * @param repeat Whether this is a repeat lookup for the same block
413    *        (used to avoid double counting cache misses when doing double-check locking)
414    * @param updateCacheMetrics Whether to update cache metrics or not
415    * @return buffer of specified cache key, or null if not in cache
416    */
417   @Override
418   public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
419       boolean updateCacheMetrics) {
420     LruCachedBlock cb = map.get(cacheKey);
421     if (cb == null) {
422       if (!repeat && updateCacheMetrics) stats.miss(caching);
423       // If there is another block cache then try and read there.
424       // However if this is a retry ( second time in double checked locking )
425       // And it's already a miss then the l2 will also be a miss.
426       if (victimHandler != null && !repeat) {
427         Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
428 
429         // Promote this to L1.
430         if (result != null && caching) {
431           cacheBlock(cacheKey, result, /* inMemory = */ false, /* cacheData = */ true);
432         }
433         return result;
434       }
435       return null;
436     }
437     if (updateCacheMetrics) stats.hit(caching);
438     cb.access(count.incrementAndGet());
439     return cb.getBuffer();
440   }
441 
442   /**
443    * Whether the cache contains block with specified cacheKey
444    * @param cacheKey
445    * @return true if contains the block
446    */
447   public boolean containsBlock(BlockCacheKey cacheKey) {
448     return map.containsKey(cacheKey);
449   }
450 
451   @Override
452   public boolean evictBlock(BlockCacheKey cacheKey) {
453     LruCachedBlock cb = map.get(cacheKey);
454     if (cb == null) return false;
455     evictBlock(cb, false);
456     return true;
457   }
458 
459   /**
460    * Evicts all blocks for a specific HFile. This is an
461    * expensive operation implemented as a linear-time search through all blocks
462    * in the cache. Ideally this should be a search in a log-access-time map.
463    *
464    * <p>
465    * This is used for evict-on-close to remove all blocks of a specific HFile.
466    *
467    * @return the number of blocks evicted
468    */
469   @Override
470   public int evictBlocksByHfileName(String hfileName) {
471     int numEvicted = 0;
472     for (BlockCacheKey key : map.keySet()) {
473       if (key.getHfileName().equals(hfileName)) {
474         if (evictBlock(key))
475           ++numEvicted;
476       }
477     }
478     if (victimHandler != null) {
479       numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
480     }
481     return numEvicted;
482   }
483 
484   /**
485    * Evict the block, and it will be cached by the victim handler if exists &&
486    * block may be read again later
487    * @param block
488    * @param evictedByEvictionProcess true if the given block is evicted by
489    *          EvictionThread
490    * @return the heap size of evicted block
491    */
492   protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) {
493     map.remove(block.getCacheKey());
494     updateSizeMetrics(block, true);
495     long val = elements.decrementAndGet();
496     if (LOG.isTraceEnabled()) {
497       long size = map.size();
498       assertCounterSanity(size, val);
499     }
500     stats.evicted(block.getCachedTime());
501     if (evictedByEvictionProcess && victimHandler != null) {
502       if (victimHandler instanceof BucketCache) {
503         boolean wait = getCurrentSize() < acceptableSize();
504         boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
505         ((BucketCache)victimHandler).cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
506             inMemory, wait);
507       } else {
508         victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer());
509       }
510     }
511     return block.heapSize();
512   }
513 
514   /**
515    * Multi-threaded call to run the eviction process.
516    */
517   private void runEviction() {
518     if(evictionThread == null) {
519       evict();
520     } else {
521       evictionThread.evict();
522     }
523   }
524 
525   /**
526    * Eviction method.
527    */
528   void evict() {
529 
530     // Ensure only one eviction at a time
531     if(!evictionLock.tryLock()) return;
532 
533     try {
534       evictionInProgress = true;
535       long currentSize = this.size.get();
536       long bytesToFree = currentSize - minSize();
537 
538       if (LOG.isTraceEnabled()) {
539         LOG.trace("Block cache LRU eviction started; Attempting to free " +
540           StringUtils.byteDesc(bytesToFree) + " of total=" +
541           StringUtils.byteDesc(currentSize));
542       }
543 
544       if(bytesToFree <= 0) return;
545 
546       // Instantiate priority buckets
547       BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize,
548           singleSize());
549       BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize,
550           multiSize());
551       BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize,
552           memorySize());
553 
554       // Scan entire map putting into appropriate buckets
555       for(LruCachedBlock cachedBlock : map.values()) {
556         switch(cachedBlock.getPriority()) {
557           case SINGLE: {
558             bucketSingle.add(cachedBlock);
559             break;
560           }
561           case MULTI: {
562             bucketMulti.add(cachedBlock);
563             break;
564           }
565           case MEMORY: {
566             bucketMemory.add(cachedBlock);
567             break;
568           }
569         }
570       }
571 
572       long bytesFreed = 0;
573       if (forceInMemory || memoryFactor > 0.999f) {
574         long s = bucketSingle.totalSize();
575         long m = bucketMulti.totalSize();
576         if (bytesToFree > (s + m)) {
577           // this means we need to evict blocks in memory bucket to make room,
578           // so the single and multi buckets will be emptied
579           bytesFreed = bucketSingle.free(s);
580           bytesFreed += bucketMulti.free(m);
581           if (LOG.isTraceEnabled()) {
582             LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
583               " from single and multi buckets");
584           }
585           bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
586           if (LOG.isTraceEnabled()) {
587             LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
588               " total from all three buckets ");
589           }
590         } else {
591           // this means no need to evict block in memory bucket,
592           // and we try best to make the ratio between single-bucket and
593           // multi-bucket is 1:2
594           long bytesRemain = s + m - bytesToFree;
595           if (3 * s <= bytesRemain) {
596             // single-bucket is small enough that no eviction happens for it
597             // hence all eviction goes from multi-bucket
598             bytesFreed = bucketMulti.free(bytesToFree);
599           } else if (3 * m <= 2 * bytesRemain) {
600             // multi-bucket is small enough that no eviction happens for it
601             // hence all eviction goes from single-bucket
602             bytesFreed = bucketSingle.free(bytesToFree);
603           } else {
604             // both buckets need to evict some blocks
605             bytesFreed = bucketSingle.free(s - bytesRemain / 3);
606             if (bytesFreed < bytesToFree) {
607               bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
608             }
609           }
610         }
611       } else {
612         PriorityQueue<BlockBucket> bucketQueue =
613           new PriorityQueue<BlockBucket>(3);
614 
615         bucketQueue.add(bucketSingle);
616         bucketQueue.add(bucketMulti);
617         bucketQueue.add(bucketMemory);
618 
619         int remainingBuckets = 3;
620 
621         BlockBucket bucket;
622         while((bucket = bucketQueue.poll()) != null) {
623           long overflow = bucket.overflow();
624           if(overflow > 0) {
625             long bucketBytesToFree = Math.min(overflow,
626                 (bytesToFree - bytesFreed) / remainingBuckets);
627             bytesFreed += bucket.free(bucketBytesToFree);
628           }
629           remainingBuckets--;
630         }
631       }
632 
633       if (LOG.isTraceEnabled()) {
634         long single = bucketSingle.totalSize();
635         long multi = bucketMulti.totalSize();
636         long memory = bucketMemory.totalSize();
637         LOG.trace("Block cache LRU eviction completed; " +
638           "freed=" + StringUtils.byteDesc(bytesFreed) + ", " +
639           "total=" + StringUtils.byteDesc(this.size.get()) + ", " +
640           "single=" + StringUtils.byteDesc(single) + ", " +
641           "multi=" + StringUtils.byteDesc(multi) + ", " +
642           "memory=" + StringUtils.byteDesc(memory));
643       }
644     } finally {
645       stats.evict();
646       evictionInProgress = false;
647       evictionLock.unlock();
648     }
649   }
650 
651   @Override
652   public String toString() {
653     return Objects.toStringHelper(this)
654       .add("blockCount", getBlockCount())
655       .add("currentSize", getCurrentSize())
656       .add("freeSize", getFreeSize())
657       .add("maxSize", getMaxSize())
658       .add("heapSize", heapSize())
659       .add("minSize", minSize())
660       .add("minFactor", minFactor)
661       .add("multiSize", multiSize())
662       .add("multiFactor", multiFactor)
663       .add("singleSize", singleSize())
664       .add("singleFactor", singleFactor)
665       .toString();
666   }
667 
668   /**
669    * Used to group blocks into priority buckets.  There will be a BlockBucket
670    * for each priority (single, multi, memory).  Once bucketed, the eviction
671    * algorithm takes the appropriate number of elements out of each according
672    * to configuration parameters and their relatives sizes.
673    */
674   private class BlockBucket implements Comparable<BlockBucket> {
675 
676     private final String name;
677     private LruCachedBlockQueue queue;
678     private long totalSize = 0;
679     private long bucketSize;
680 
681     public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) {
682       this.name = name;
683       this.bucketSize = bucketSize;
684       queue = new LruCachedBlockQueue(bytesToFree, blockSize);
685       totalSize = 0;
686     }
687 
688     public void add(LruCachedBlock block) {
689       totalSize += block.heapSize();
690       queue.add(block);
691     }
692 
693     public long free(long toFree) {
694       if (LOG.isTraceEnabled()) {
695         LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this);
696       }
697       LruCachedBlock cb;
698       long freedBytes = 0;
699       while ((cb = queue.pollLast()) != null) {
700         freedBytes += evictBlock(cb, true);
701         if (freedBytes >= toFree) {
702           return freedBytes;
703         }
704       }
705       if (LOG.isTraceEnabled()) {
706         LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this);
707       }
708       return freedBytes;
709     }
710 
711     public long overflow() {
712       return totalSize - bucketSize;
713     }
714 
715     public long totalSize() {
716       return totalSize;
717     }
718 
719     public int compareTo(BlockBucket that) {
720       return Long.compare(this.overflow(), that.overflow());
721     }
722 
723     @Override
724     public boolean equals(Object that) {
725       if (that == null || !(that instanceof BlockBucket)){
726         return false;
727       }
728       return compareTo((BlockBucket)that) == 0;
729     }
730 
731     @Override
732     public int hashCode() {
733       return Objects.hashCode(name, bucketSize, queue, totalSize);
734     }
735 
736     @Override
737     public String toString() {
738       return Objects.toStringHelper(this)
739         .add("name", name)
740         .add("totalSize", StringUtils.byteDesc(totalSize))
741         .add("bucketSize", StringUtils.byteDesc(bucketSize))
742         .toString();
743     }
744   }
745 
746   /**
747    * Get the maximum size of this cache.
748    * @return max size in bytes
749    */
750   public long getMaxSize() {
751     return this.maxSize;
752   }
753 
754   @Override
755   public long getCurrentSize() {
756     return this.size.get();
757   }
758 
759   @Override
760   public long getFreeSize() {
761     return getMaxSize() - getCurrentSize();
762   }
763 
764   @Override
765   public long size() {
766     return getMaxSize();
767   }
768 
769   @Override
770   public long getBlockCount() {
771     return this.elements.get();
772   }
773 
774   EvictionThread getEvictionThread() {
775     return this.evictionThread;
776   }
777 
778   /*
779    * Eviction thread.  Sits in waiting state until an eviction is triggered
780    * when the cache size grows above the acceptable level.<p>
781    *
782    * Thread is triggered into action by {@link LruBlockCache#runEviction()}
783    */
784   static class EvictionThread extends HasThread {
785     private WeakReference<LruBlockCache> cache;
786     private volatile boolean go = true;
787     // flag set after enter the run method, used for test
788     private boolean enteringRun = false;
789 
790     public EvictionThread(LruBlockCache cache) {
791       super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread");
792       setDaemon(true);
793       this.cache = new WeakReference<LruBlockCache>(cache);
794     }
795 
796     @Override
797     public void run() {
798       enteringRun = true;
799       while (this.go) {
800         synchronized(this) {
801           try {
802             this.wait(1000 * 10/*Don't wait for ever*/);
803           } catch(InterruptedException e) {
804             LOG.warn("Interrupted eviction thread ", e);
805             Thread.currentThread().interrupt();
806           }
807         }
808         LruBlockCache cache = this.cache.get();
809         if (cache == null) break;
810         cache.evict();
811       }
812     }
813 
814     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
815         justification="This is what we want")
816     public void evict() {
817       synchronized(this) {
818         this.notifyAll();
819       }
820     }
821 
822     synchronized void shutdown() {
823       this.go = false;
824       this.notifyAll();
825     }
826 
827     /**
828      * Used for the test.
829      */
830     boolean isEnteringRun() {
831       return this.enteringRun;
832     }
833   }
834 
835   /*
836    * Statistics thread.  Periodically prints the cache statistics to the log.
837    */
838   static class StatisticsThread extends Thread {
839     private final LruBlockCache lru;
840 
841     public StatisticsThread(LruBlockCache lru) {
842       super("LruBlockCacheStats");
843       setDaemon(true);
844       this.lru = lru;
845     }
846 
847     @Override
848     public void run() {
849       lru.logStats();
850     }
851   }
852 
853   public void logStats() {
854     // Log size
855     long totalSize = heapSize();
856     long freeSize = maxSize - totalSize;
857     LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
858         "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
859         "max=" + StringUtils.byteDesc(this.maxSize) + ", " +
860         "blockCount=" + getBlockCount() + ", " +
861         "accesses=" + stats.getRequestCount() + ", " +
862         "hits=" + stats.getHitCount() + ", " +
863         "hitRatio=" + (stats.getHitCount() == 0 ?
864           "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
865         "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
866         "cachingHits=" + stats.getHitCachingCount() + ", " +
867         "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ?
868           "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
869         "evictions=" + stats.getEvictionCount() + ", " +
870         "evicted=" + stats.getEvictedCount() + ", " +
871         "evictedPerRun=" + stats.evictedPerEviction());
872   }
873 
874   /**
875    * Get counter statistics for this cache.
876    *
877    * <p>Includes: total accesses, hits, misses, evicted blocks, and runs
878    * of the eviction processes.
879    */
880   public CacheStats getStats() {
881     return this.stats;
882   }
883 
884   public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
885       (3 * Bytes.SIZEOF_LONG) + (9 * ClassSize.REFERENCE) +
886       (5 * Bytes.SIZEOF_FLOAT) + Bytes.SIZEOF_BOOLEAN
887       + ClassSize.OBJECT);
888 
889   @Override
890   public long heapSize() {
891     return getCurrentSize();
892   }
893 
894   public static long calculateOverhead(long maxSize, long blockSize, int concurrency){
895     // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG
896     return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP +
897         ((long)Math.ceil(maxSize*1.2/blockSize)
898             * ClassSize.CONCURRENT_HASHMAP_ENTRY) +
899         ((long)concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
900   }
901 
902   @Override
903   public Iterator<CachedBlock> iterator() {
904     final Iterator<LruCachedBlock> iterator = map.values().iterator();
905 
906     return new Iterator<CachedBlock>() {
907       private final long now = System.nanoTime();
908 
909       @Override
910       public boolean hasNext() {
911         return iterator.hasNext();
912       }
913 
914       @Override
915       public CachedBlock next() {
916         final LruCachedBlock b = iterator.next();
917         return new CachedBlock() {
918           @Override
919           public String toString() {
920             return BlockCacheUtil.toString(this, now);
921           }
922 
923           @Override
924           public BlockPriority getBlockPriority() {
925             return b.getPriority();
926           }
927 
928           @Override
929           public BlockType getBlockType() {
930             return b.getBuffer().getBlockType();
931           }
932 
933           @Override
934           public long getOffset() {
935             return b.getCacheKey().getOffset();
936           }
937 
938           @Override
939           public long getSize() {
940             return b.getBuffer().heapSize();
941           }
942 
943           @Override
944           public long getCachedTime() {
945             return b.getCachedTime();
946           }
947 
948           @Override
949           public String getFilename() {
950             return b.getCacheKey().getHfileName();
951           }
952 
953           @Override
954           public int compareTo(CachedBlock other) {
955             int diff = this.getFilename().compareTo(other.getFilename());
956             if (diff != 0) return diff;
957             diff = Long.compare(this.getOffset(), other.getOffset());
958             if (diff != 0) return diff;
959             if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
960               throw new IllegalStateException("" + this.getCachedTime() + ", " +
961                 other.getCachedTime());
962             }
963             return Long.compare(other.getCachedTime(), this.getCachedTime());
964           }
965 
966           @Override
967           public int hashCode() {
968             return b.hashCode();
969           }
970 
971           @Override
972           public boolean equals(Object obj) {
973             if (obj instanceof CachedBlock) {
974               CachedBlock cb = (CachedBlock)obj;
975               return compareTo(cb) == 0;
976             } else {
977               return false;
978             }
979           }
980         };
981       }
982 
983       @Override
984       public void remove() {
985         throw new UnsupportedOperationException();
986       }
987     };
988   }
989 
990   // Simple calculators of sizes given factors and maxSize
991 
992   long acceptableSize() {
993     return (long)Math.floor(this.maxSize * this.acceptableFactor);
994   }
995   private long minSize() {
996     return (long)Math.floor(this.maxSize * this.minFactor);
997   }
998   private long singleSize() {
999     return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor);
1000   }
1001   private long multiSize() {
1002     return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor);
1003   }
1004   private long memorySize() {
1005     return (long)Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
1006   }
1007 
1008   public void shutdown() {
1009     if (victimHandler != null)
1010       victimHandler.shutdown();
1011     this.scheduleThreadPool.shutdown();
1012     for (int i = 0; i < 10; i++) {
1013       if (!this.scheduleThreadPool.isShutdown()) {
1014         try {
1015           Thread.sleep(10);
1016         } catch (InterruptedException e) {
1017           LOG.warn("Interrupted while sleeping");
1018           Thread.currentThread().interrupt();
1019           break;
1020         }
1021       }
1022     }
1023 
1024     if (!this.scheduleThreadPool.isShutdown()) {
1025       List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
1026       LOG.debug("Still running " + runnables);
1027     }
1028     this.evictionThread.shutdown();
1029   }
1030 
1031   /** Clears the cache. Used in tests. */
1032   @VisibleForTesting
1033   public void clearCache() {
1034     this.map.clear();
1035     this.elements.set(0);
1036   }
1037 
1038   /**
1039    * Used in testing. May be very inefficient.
1040    * @return the set of cached file names
1041    */
1042   @VisibleForTesting
1043   SortedSet<String> getCachedFileNamesForTest() {
1044     SortedSet<String> fileNames = new TreeSet<String>();
1045     for (BlockCacheKey cacheKey : map.keySet()) {
1046       fileNames.add(cacheKey.getHfileName());
1047     }
1048     return fileNames;
1049   }
1050 
1051   @VisibleForTesting
1052   Map<BlockType, Integer> getBlockTypeCountsForTest() {
1053     Map<BlockType, Integer> counts =
1054         new EnumMap<BlockType, Integer>(BlockType.class);
1055     for (LruCachedBlock cb : map.values()) {
1056       BlockType blockType = ((Cacheable)cb.getBuffer()).getBlockType();
1057       Integer count = counts.get(blockType);
1058       counts.put(blockType, (count == null ? 0 : count) + 1);
1059     }
1060     return counts;
1061   }
1062 
1063   @VisibleForTesting
1064   public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
1065     Map<DataBlockEncoding, Integer> counts =
1066         new EnumMap<DataBlockEncoding, Integer>(DataBlockEncoding.class);
1067     for (LruCachedBlock block : map.values()) {
1068       DataBlockEncoding encoding =
1069               ((HFileBlock) block.getBuffer()).getDataBlockEncoding();
1070       Integer count = counts.get(encoding);
1071       counts.put(encoding, (count == null ? 0 : count) + 1);
1072     }
1073     return counts;
1074   }
1075 
1076   public void setVictimCache(BlockCache handler) {
1077     assert victimHandler == null;
1078     victimHandler = handler;
1079   }
1080 
1081   @VisibleForTesting
1082   Map<BlockCacheKey, LruCachedBlock> getMapForTests() {
1083     return map;
1084   }
1085 
1086   BlockCache getVictimHandler() {
1087     return this.victimHandler;
1088   }
1089 
1090   @Override
1091   public BlockCache[] getBlockCaches() {
1092     return null;
1093   }
1094 }