View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.lang.ref.WeakReference;
22  import java.nio.ByteBuffer;
23  import java.util.EnumMap;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.PriorityQueue;
28  import java.util.SortedSet;
29  import java.util.TreeSet;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.Executors;
32  import java.util.concurrent.ScheduledExecutorService;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicLong;
35  import java.util.concurrent.locks.ReentrantLock;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.io.HeapSize;
42  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
43  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.hadoop.hbase.util.ClassSize;
46  import org.apache.hadoop.hbase.util.HasThread;
47  import org.apache.hadoop.util.StringUtils;
48  import org.codehaus.jackson.annotate.JsonIgnoreProperties;
49  
50  import com.google.common.annotations.VisibleForTesting;
51  import com.google.common.base.Objects;
52  import com.google.common.util.concurrent.ThreadFactoryBuilder;
53  
54  /**
55   * A block cache implementation that is memory-aware using {@link HeapSize},
56   * memory-bound using an LRU eviction algorithm, and concurrent: backed by a
57   * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving
58   * constant-time {@link #cacheBlock} and {@link #getBlock} operations.<p>
59   *
60   * Contains three levels of block priority to allow for scan-resistance and in-memory families 
61   * {@link org.apache.hadoop.hbase.HColumnDescriptor#setInMemory(boolean)} (An in-memory column 
62   * family is a column family that should be served from memory if possible):
63   * single-access, multiple-accesses, and in-memory priority.
64   * A block is added with an in-memory priority flag if
65   * {@link org.apache.hadoop.hbase.HColumnDescriptor#isInMemory()}, otherwise a block becomes a
66   *  single access priority the first time it is read into this block cache.  If a block is
67   *  accessed again while in cache, it is marked as a multiple access priority block.  This
68   *  delineation of blocks is used to prevent scans from thrashing the cache adding a 
69   *  least-frequently-used element to the eviction algorithm.<p>
70   *
71   * Each priority is given its own chunk of the total cache to ensure
72   * fairness during eviction.  Each priority will retain close to its maximum
73   * size, however, if any priority is not using its entire chunk the others
74   * are able to grow beyond their chunk size.<p>
75   *
76   * Instantiated at a minimum with the total size and average block size.
77   * All sizes are in bytes.  The block size is not especially important as this
78   * cache is fully dynamic in its sizing of blocks.  It is only used for
79   * pre-allocating data structures and in initial heap estimation of the map.<p>
80   *
81   * The detailed constructor defines the sizes for the three priorities (they
82   * should total to the <code>maximum size</code> defined).  It also sets the levels that
83   * trigger and control the eviction thread.<p>
84   *
85   * The <code>acceptable size</code> is the cache size level which triggers the eviction
86   * process to start.  It evicts enough blocks to get the size below the
87   * minimum size specified.<p>
88   *
89   * Eviction happens in a separate thread and involves a single full-scan
90   * of the map.  It determines how many bytes must be freed to reach the minimum
91   * size, and then while scanning determines the fewest least-recently-used
92   * blocks necessary from each of the three priorities (would be 3 times bytes
93   * to free).  It then uses the priority chunk sizes to evict fairly according
94   * to the relative sizes and usage.
95   */
96  @InterfaceAudience.Private
97  @JsonIgnoreProperties({"encodingCountsForTest"})
98  public class LruBlockCache implements ResizableBlockCache, HeapSize {
99  
100   private static final Log LOG = LogFactory.getLog(LruBlockCache.class);
101 
102   /**
103    * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep
104    * evicting during an eviction run till the cache size is down to 80% of the total.
105    */
106   static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";
107 
108   /**
109    * Acceptable size of cache (no evictions if size < acceptable)
110    */
111   static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.acceptable.factor";
112 
113   /**
114    * Hard capacity limit of cache, will reject any put if size > this * acceptable
115    */
116   static final String LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.hard.capacity.limit.factor";
117   static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.single.percentage";
118   static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.multi.percentage";
119   static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.memory.percentage";
120
121   /**
122    * Configuration key to force data-block always (except in-memory are too much)
123    * cached in memory for in-memory hfile, unlike inMemory, which is a column-family
124    * configuration, inMemoryForceMode is a cluster-wide configuration
125    */
126   static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME = "hbase.lru.rs.inmemoryforcemode";
127
128   /** Default Configuration Parameters*/
129 
130   /** Backing Concurrent Map Configuration */
131   static final float DEFAULT_LOAD_FACTOR = 0.75f;
132   static final int DEFAULT_CONCURRENCY_LEVEL = 16;
133 
134   /** Eviction thresholds */
135   static final float DEFAULT_MIN_FACTOR = 0.95f;
136   static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f;
137
138   /** Priority buckets */
139   static final float DEFAULT_SINGLE_FACTOR = 0.25f;
140   static final float DEFAULT_MULTI_FACTOR = 0.50f;
141   static final float DEFAULT_MEMORY_FACTOR = 0.25f;
142
143   /** default hard capacity limit */
144   static final float DEFAULT_HARD_CAPACITY_LIMIT_FACTOR = 1.2f;
145 
146   static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false;
147
148   /** Statistics thread */
149   static final int statThreadPeriod = 60 * 5;
150   private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size";
151   private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
152 
153   /** Concurrent map (the cache) */
154   private final Map<BlockCacheKey,LruCachedBlock> map;
155 
156   /** Eviction lock (locked when eviction in process) */
157   private final ReentrantLock evictionLock = new ReentrantLock(true);
158   private final long maxBlockSize;
159
160   /** Volatile boolean to track if we are in an eviction process or not */
161   private volatile boolean evictionInProgress = false;
162 
163   /** Eviction thread */
164   private final EvictionThread evictionThread;
165 
166   /** Statistics thread schedule pool (for heavy debugging, could remove) */
167   private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
168     new ThreadFactoryBuilder().setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build());
169
170   /** Current size of cache */
171   private final AtomicLong size;
172
173   /** Current number of cached elements */
174   private final AtomicLong elements;
175
176   /** Cache access count (sequential ID) */
177   private final AtomicLong count;
178
179   /** hard capacity limit */
180   private float hardCapacityLimitFactor;
181
182   /** Cache statistics */
183   private final CacheStats stats;
184
185   /** Maximum allowable size of cache (block put if size > max, evict) */
186   private long maxSize;
187
188   /** Approximate block size */
189   private long blockSize;
190
191   /** Acceptable size of cache (no evictions if size < acceptable) */
192   private float acceptableFactor;
193
194   /** Minimum threshold of cache (when evicting, evict until size < min) */
195   private float minFactor;
196
197   /** Single access bucket size */
198   private float singleFactor;
199
200   /** Multiple access bucket size */
201   private float multiFactor;
202
203   /** In-memory bucket size */
204   private float memoryFactor;
205
206   /** Overhead of the structure itself */
207   private long overhead;
208
209   /** Whether in-memory hfile's data block has higher priority when evicting */
210   private boolean forceInMemory;
211
212   /** Where to send victims (blocks evicted/missing from the cache) */
213   private BlockCache victimHandler = null;
214
215   /**
216    * Default constructor.  Specify maximum size and expected average block
217    * size (approximation is fine).
218    *
219    * <p>All other factors will be calculated based on defaults specified in
220    * this class.
221    * @param maxSize maximum size of cache, in bytes
222    * @param blockSize approximate size of each block, in bytes
223    */
224   public LruBlockCache(long maxSize, long blockSize) {
225     this(maxSize, blockSize, true);
226   }
227
228   /**
229    * Constructor used for testing.  Allows disabling of the eviction thread.
230    */
231   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
232     this(maxSize, blockSize, evictionThread,
233         (int)Math.ceil(1.2*maxSize/blockSize),
234         DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
235         DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
236         DEFAULT_SINGLE_FACTOR,
237         DEFAULT_MULTI_FACTOR,
238         DEFAULT_MEMORY_FACTOR,
239         DEFAULT_HARD_CAPACITY_LIMIT_FACTOR,
240         false,
241         DEFAULT_MAX_BLOCK_SIZE
242         );
243   }
244
245   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
246     this(maxSize, blockSize, evictionThread,
247         (int)Math.ceil(1.2*maxSize/blockSize),
248         DEFAULT_LOAD_FACTOR,
249         DEFAULT_CONCURRENCY_LEVEL,
250         conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
251         conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR),
252         conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR),
253         conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR),
254         conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR),
255         conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, DEFAULT_HARD_CAPACITY_LIMIT_FACTOR),
256         conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE),
257         conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE)
258         );
259   }
260
261   public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
262     this(maxSize, blockSize, true, conf);
263   }
264
265   /**
266    * Configurable constructor.  Use this constructor if not using defaults.
267    * @param maxSize maximum size of this cache, in bytes
268    * @param blockSize expected average size of blocks, in bytes
269    * @param evictionThread whether to run evictions in a bg thread or not
270    * @param mapInitialSize initial size of backing ConcurrentHashMap
271    * @param mapLoadFactor initial load factor of backing ConcurrentHashMap
272    * @param mapConcurrencyLevel initial concurrency factor for backing CHM
273    * @param minFactor percentage of total size that eviction will evict until
274    * @param acceptableFactor percentage of total size that triggers eviction
275    * @param singleFactor percentage of total size for single-access blocks
276    * @param multiFactor percentage of total size for multiple-access blocks
277    * @param memoryFactor percentage of total size for in-memory blocks
278    */
279   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
280       int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
281       float minFactor, float acceptableFactor, float singleFactor,
282       float multiFactor, float memoryFactor, float hardLimitFactor,
283       boolean forceInMemory, long maxBlockSize) {
284     this.maxBlockSize = maxBlockSize;
285     if(singleFactor + multiFactor + memoryFactor != 1 ||
286         singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) {
287       throw new IllegalArgumentException("Single, multi, and memory factors " +
288           " should be non-negative and total 1.0");
289     }
290     if(minFactor >= acceptableFactor) {
291       throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
292     }
293     if(minFactor >= 1.0f || acceptableFactor >= 1.0f) {
294       throw new IllegalArgumentException("all factors must be < 1");
295     }
296     this.maxSize = maxSize;
297     this.blockSize = blockSize;
298     this.forceInMemory = forceInMemory;
299     map = new ConcurrentHashMap<BlockCacheKey,LruCachedBlock>(mapInitialSize,
300         mapLoadFactor, mapConcurrencyLevel);
301     this.minFactor = minFactor;
302     this.acceptableFactor = acceptableFactor;
303     this.singleFactor = singleFactor;
304     this.multiFactor = multiFactor;
305     this.memoryFactor = memoryFactor;
306     this.stats = new CacheStats(this.getClass().getSimpleName());
307     this.count = new AtomicLong(0);
308     this.elements = new AtomicLong(0);
309     this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
310     this.size = new AtomicLong(this.overhead);
311     this.hardCapacityLimitFactor = hardLimitFactor;
312     if(evictionThread) {
313       this.evictionThread = new EvictionThread(this);
314       this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
315     } else {
316       this.evictionThread = null;
317     }
318     // TODO: Add means of turning this off.  Bit obnoxious running thread just to make a log
319     // every five minutes.
320     this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
321         statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
322   }
323
324   @Override
325   public void setMaxSize(long maxSize) {
326     this.maxSize = maxSize;
327     if(this.size.get() > acceptableSize() && !evictionInProgress) {
328       runEviction();
329     }
330   }
331
332   // BlockCache implementation
333 
334   /**
335    * Cache the block with the specified name and buffer.
336    * <p>
337    * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)
338    * this can happen, for which we compare the buffer contents.
339    * @param cacheKey block's cache key
340    * @param buf block buffer
341    * @param inMemory if block is in-memory
342    * @param cacheDataInL1
343    */
344   @Override
345   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
346       final boolean cacheDataInL1) {
347 
348     if (buf.heapSize() > maxBlockSize) {
349       // If there are a lot of blocks that are too
350       // big this can make the logs way too noisy.
351       // So we log 2%
352       if (stats.failInsert() % 50 == 0) {
353         LOG.warn("Trying to cache too large a block "
354             + cacheKey.getHfileName() + " @ "
355             + cacheKey.getOffset()
356             + " is " + buf.heapSize()
357             + " which is larger than " + maxBlockSize);
358       }
359       return;
360     }
361
362     LruCachedBlock cb = map.get(cacheKey);
363     if (cb != null) {
364       // compare the contents, if they are not equal, we are in big trouble
365       if (compare(buf, cb.getBuffer()) != 0) {
366         throw new RuntimeException("Cached block contents differ, which should not have happened."
367           + "cacheKey:" + cacheKey);
368       }
369       String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey();
370       msg += ". This is harmless and can happen in rare cases (see HBASE-8547)";
371       LOG.warn(msg);
372       return;
373     }
374     long currentSize = size.get();
375     long currentAcceptableSize = acceptableSize();
376     long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize);
377     if (currentSize >= hardLimitSize) {
378       stats.failInsert();
379       if (LOG.isTraceEnabled()) {
380         LOG.trace("LruBlockCache current size " + StringUtils.byteDesc(currentSize)
381           + " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + "  too many."
382           + " the hard limit size is " + StringUtils.byteDesc(hardLimitSize) + ", failed to put cacheKey:"
383           + cacheKey + " into LruBlockCache.");
384       }
385       if (!evictionInProgress) {
386         runEviction();
387       }
388       return;
389     }
390     cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
391     long newSize = updateSizeMetrics(cb, false);
392     map.put(cacheKey, cb);
393     long val = elements.incrementAndGet();
394     if (LOG.isTraceEnabled()) {
395       long size = map.size();
396       assertCounterSanity(size, val);
397     }
398     if (newSize > currentAcceptableSize && !evictionInProgress) {
399       runEviction();
400     }
401   }
402
403   /**
404    * Sanity-checking for parity between actual block cache content and metrics.
405    * Intended only for use with TRACE level logging and -ea JVM.
406    */
407   private static void assertCounterSanity(long mapSize, long counterVal) {
408     if (counterVal < 0) {
409       LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal +
410         ", mapSize=" + mapSize);
411       return;
412     }
413     if (mapSize < Integer.MAX_VALUE) {
414       double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.);
415       if (pct_diff > 0.05) {
416         LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal +
417           ", mapSize=" + mapSize);
418       }
419     }
420   }
421
422   private int compare(Cacheable left, Cacheable right) {
423     ByteBuffer l = ByteBuffer.allocate(left.getSerializedLength());
424     left.serialize(l);
425     ByteBuffer r = ByteBuffer.allocate(right.getSerializedLength());
426     right.serialize(r);
427     return Bytes.compareTo(l.array(), l.arrayOffset(), l.limit(),
428       r.array(), r.arrayOffset(), r.limit());
429   }
430
431   /**
432    * Cache the block with the specified name and buffer.
433    * <p>
434    * @param cacheKey block's cache key
435    * @param buf block buffer
436    */
437   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
438     cacheBlock(cacheKey, buf, false, false);
439   }
440
441   /**
442    * Helper function that updates the local size counter and also updates any
443    * per-cf or per-blocktype metrics it can discern from given
444    * {@link LruCachedBlock}
445    *
446    * @param cb
447    * @param evict
448    */
449   protected long updateSizeMetrics(LruCachedBlock cb, boolean evict) {
450     long heapsize = cb.heapSize();
451     if (evict) {
452       heapsize *= -1;
453     }
454     return size.addAndGet(heapsize);
455   }
456
457   /**
458    * Get the buffer of the block with the specified name.
459    * @param cacheKey block's cache key
460    * @param caching true if the caller caches blocks on cache misses
461    * @param repeat Whether this is a repeat lookup for the same block
462    *        (used to avoid double counting cache misses when doing double-check locking)
463    * @param updateCacheMetrics Whether to update cache metrics or not
464    * @return buffer of specified cache key, or null if not in cache
465    */
466   @Override
467   public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
468       boolean updateCacheMetrics) {
469     LruCachedBlock cb = map.get(cacheKey);
470     if (cb == null) {
471       if (!repeat && updateCacheMetrics) {
472         stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
473       }
474       // If there is another block cache then try and read there.
475       // However if this is a retry ( second time in double checked locking )
476       // And it's already a miss then the l2 will also be a miss.
477       if (victimHandler != null && !repeat) {
478         Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
479
480         // Promote this to L1.
481         if (result != null && caching) {
482           cacheBlock(cacheKey, result, /* inMemory = */ false, /* cacheData = */ true);
483         }
484         return result;
485       }
486       return null;
487     }
488     if (updateCacheMetrics) stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
489     cb.access(count.incrementAndGet());
490     return cb.getBuffer();
491   }
492
493   /**
494    * Whether the cache contains block with specified cacheKey
495    * @param cacheKey
496    * @return true if contains the block
497    */
498   public boolean containsBlock(BlockCacheKey cacheKey) {
499     return map.containsKey(cacheKey);
500   }
501
502   @Override
503   public boolean evictBlock(BlockCacheKey cacheKey) {
504     LruCachedBlock cb = map.get(cacheKey);
505     if (cb == null) return false;
506     return evictBlock(cb, false) > 0;
507   }
508
509   /**
510    * Evicts all blocks for a specific HFile. This is an
511    * expensive operation implemented as a linear-time search through all blocks
512    * in the cache. Ideally this should be a search in a log-access-time map.
513    *
514    * <p>
515    * This is used for evict-on-close to remove all blocks of a specific HFile.
516    *
517    * @return the number of blocks evicted
518    */
519   @Override
520   public int evictBlocksByHfileName(String hfileName) {
521     int numEvicted = 0;
522     for (BlockCacheKey key : map.keySet()) {
523       if (key.getHfileName().equals(hfileName)) {
524         if (evictBlock(key))
525           ++numEvicted;
526       }
527     }
528     if (victimHandler != null) {
529       numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
530     }
531     return numEvicted;
532   }
533
534   /**
535    * Evict the block, and it will be cached by the victim handler if exists &amp;&amp;
536    * block may be read again later
537    * @param block
538    * @param evictedByEvictionProcess true if the given block is evicted by
539    *          EvictionThread
540    * @return the heap size of evicted block
541    */
542   protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) {
543     boolean found = map.remove(block.getCacheKey()) != null;
544     if (!found) {
545       return 0;
546     }
547     updateSizeMetrics(block, true);
548     long val = elements.decrementAndGet();
549     if (LOG.isTraceEnabled()) {
550       long size = map.size();
551       assertCounterSanity(size, val);
552     }
553     stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary());
554     if (evictedByEvictionProcess && victimHandler != null) {
555       if (victimHandler instanceof BucketCache) {
556         boolean wait = getCurrentSize() < acceptableSize();
557         boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
558         ((BucketCache)victimHandler).cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
559             inMemory, wait);
560       } else {
561         victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer());
562       }
563     }
564     return block.heapSize();
565   }
566
567   /**
568    * Multi-threaded call to run the eviction process.
569    */
570   private void runEviction() {
571     if(evictionThread == null) {
572       evict();
573     } else {
574       evictionThread.evict();
575     }
576   }
577
578   @VisibleForTesting
579   boolean isEvictionInProgress() {
580     return evictionInProgress;
581   }
582
583   @VisibleForTesting
584   long getOverhead() {
585     return overhead;
586   }
587
588   /**
589    * Eviction method.
590    */
591   void evict() {
592
593     // Ensure only one eviction at a time
594     if(!evictionLock.tryLock()) return;
595
596     try {
597       evictionInProgress = true;
598       long currentSize = this.size.get();
599       long bytesToFree = currentSize - minSize();
600
601       if (LOG.isTraceEnabled()) {
602         LOG.trace("Block cache LRU eviction started; Attempting to free " +
603           StringUtils.byteDesc(bytesToFree) + " of total=" +
604           StringUtils.byteDesc(currentSize));
605       }
606
607       if(bytesToFree <= 0) return;
608
609       // Instantiate priority buckets
610       BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize,
611           singleSize());
612       BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize,
613           multiSize());
614       BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize,
615           memorySize());
616
617       // Scan entire map putting into appropriate buckets
618       for(LruCachedBlock cachedBlock : map.values()) {
619         switch(cachedBlock.getPriority()) {
620           case SINGLE: {
621             bucketSingle.add(cachedBlock);
622             break;
623           }
624           case MULTI: {
625             bucketMulti.add(cachedBlock);
626             break;
627           }
628           case MEMORY: {
629             bucketMemory.add(cachedBlock);
630             break;
631           }
632         }
633       }
634
635       long bytesFreed = 0;
636       if (forceInMemory || memoryFactor > 0.999f) {
637         long s = bucketSingle.totalSize();
638         long m = bucketMulti.totalSize();
639         if (bytesToFree > (s + m)) {
640           // this means we need to evict blocks in memory bucket to make room,
641           // so the single and multi buckets will be emptied
642           bytesFreed = bucketSingle.free(s);
643           bytesFreed += bucketMulti.free(m);
644           if (LOG.isTraceEnabled()) {
645             LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
646               " from single and multi buckets");
647           }
648           bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
649           if (LOG.isTraceEnabled()) {
650             LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
651               " total from all three buckets ");
652           }
653         } else {
654           // this means no need to evict block in memory bucket,
655           // and we try best to make the ratio between single-bucket and
656           // multi-bucket is 1:2
657           long bytesRemain = s + m - bytesToFree;
658           if (3 * s <= bytesRemain) {
659             // single-bucket is small enough that no eviction happens for it
660             // hence all eviction goes from multi-bucket
661             bytesFreed = bucketMulti.free(bytesToFree);
662           } else if (3 * m <= 2 * bytesRemain) {
663             // multi-bucket is small enough that no eviction happens for it
664             // hence all eviction goes from single-bucket
665             bytesFreed = bucketSingle.free(bytesToFree);
666           } else {
667             // both buckets need to evict some blocks
668             bytesFreed = bucketSingle.free(s - bytesRemain / 3);
669             if (bytesFreed < bytesToFree) {
670               bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
671             }
672           }
673         }
674       } else {
675         PriorityQueue<BlockBucket> bucketQueue =
676           new PriorityQueue<BlockBucket>(3);
677
678         bucketQueue.add(bucketSingle);
679         bucketQueue.add(bucketMulti);
680         bucketQueue.add(bucketMemory);
681
682         int remainingBuckets = 3;
683
684         BlockBucket bucket;
685         while((bucket = bucketQueue.poll()) != null) {
686           long overflow = bucket.overflow();
687           if(overflow > 0) {
688             long bucketBytesToFree = Math.min(overflow,
689                 (bytesToFree - bytesFreed) / remainingBuckets);
690             bytesFreed += bucket.free(bucketBytesToFree);
691           }
692           remainingBuckets--;
693         }
694       }
695       if (LOG.isTraceEnabled()) {
696         long single = bucketSingle.totalSize();
697         long multi = bucketMulti.totalSize();
698         long memory = bucketMemory.totalSize();
699         LOG.trace("Block cache LRU eviction completed; " +
700           "freed=" + StringUtils.byteDesc(bytesFreed) + ", " +
701           "total=" + StringUtils.byteDesc(this.size.get()) + ", " +
702           "single=" + StringUtils.byteDesc(single) + ", " +
703           "multi=" + StringUtils.byteDesc(multi) + ", " +
704           "memory=" + StringUtils.byteDesc(memory));
705       }
706     } finally {
707       stats.evict();
708       evictionInProgress = false;
709       evictionLock.unlock();
710     }
711   }
712
713   @Override
714   public String toString() {
715     return Objects.toStringHelper(this)
716       .add("blockCount", getBlockCount())
717       .add("currentSize", getCurrentSize())
718       .add("freeSize", getFreeSize())
719       .add("maxSize", getMaxSize())
720       .add("heapSize", heapSize())
721       .add("minSize", minSize())
722       .add("minFactor", minFactor)
723       .add("multiSize", multiSize())
724       .add("multiFactor", multiFactor)
725       .add("singleSize", singleSize())
726       .add("singleFactor", singleFactor)
727       .toString();
728   }
729
730   /**
731    * Used to group blocks into priority buckets.  There will be a BlockBucket
732    * for each priority (single, multi, memory).  Once bucketed, the eviction
733    * algorithm takes the appropriate number of elements out of each according
734    * to configuration parameters and their relatives sizes.
735    */
736   private class BlockBucket implements Comparable<BlockBucket> {
737
738     private final String name;
739     private LruCachedBlockQueue queue;
740     private long totalSize = 0;
741     private long bucketSize;
742
743     public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) {
744       this.name = name;
745       this.bucketSize = bucketSize;
746       queue = new LruCachedBlockQueue(bytesToFree, blockSize);
747       totalSize = 0;
748     }
749
750     public void add(LruCachedBlock block) {
751       totalSize += block.heapSize();
752       queue.add(block);
753     }
754
755     public long free(long toFree) {
756       if (LOG.isTraceEnabled()) {
757         LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this);
758       }
759       LruCachedBlock cb;
760       long freedBytes = 0;
761       while ((cb = queue.pollLast()) != null) {
762         freedBytes += evictBlock(cb, true);
763         if (freedBytes >= toFree) {
764           return freedBytes;
765         }
766       }
767       if (LOG.isTraceEnabled()) {
768         LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this);
769       }
770       return freedBytes;
771     }
772
773     public long overflow() {
774       return totalSize - bucketSize;
775     }
776
777     public long totalSize() {
778       return totalSize;
779     }
780
781     public int compareTo(BlockBucket that) {
782       return Long.compare(this.overflow(), that.overflow());
783     }
784
785     @Override
786     public boolean equals(Object that) {
787       if (that == null || !(that instanceof BlockBucket)){
788         return false;
789       }
790       return compareTo((BlockBucket)that) == 0;
791     }
792
793     @Override
794     public int hashCode() {
795       return Objects.hashCode(name, bucketSize, queue, totalSize);
796     }
797
798     @Override
799     public String toString() {
800       return Objects.toStringHelper(this)
801         .add("name", name)
802         .add("totalSize", StringUtils.byteDesc(totalSize))
803         .add("bucketSize", StringUtils.byteDesc(bucketSize))
804         .toString();
805     }
806   }
807
808   /**
809    * Get the maximum size of this cache.
810    * @return max size in bytes
811    */
812   public long getMaxSize() {
813     return this.maxSize;
814   }
815
816   @Override
817   public long getCurrentSize() {
818     return this.size.get();
819   }
820
821   @Override
822   public long getFreeSize() {
823     return getMaxSize() - getCurrentSize();
824   }
825
826   @Override
827   public long size() {
828     return getMaxSize();
829   }
830
831   @Override
832   public long getBlockCount() {
833     return this.elements.get();
834   }
835
836   EvictionThread getEvictionThread() {
837     return this.evictionThread;
838   }
839
840   /*
841    * Eviction thread.  Sits in waiting state until an eviction is triggered
842    * when the cache size grows above the acceptable level.<p>
843    *
844    * Thread is triggered into action by {@link LruBlockCache#runEviction()}
845    */
846   static class EvictionThread extends HasThread {
847     private WeakReference<LruBlockCache> cache;
848     private volatile boolean go = true;
849     // flag set after enter the run method, used for test
850     private boolean enteringRun = false;
851
852     public EvictionThread(LruBlockCache cache) {
853       super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread");
854       setDaemon(true);
855       this.cache = new WeakReference<LruBlockCache>(cache);
856     }
857
858     @Override
859     public void run() {
860       enteringRun = true;
861       while (this.go) {
862         synchronized(this) {
863           try {
864             this.wait(1000 * 10/*Don't wait for ever*/);
865           } catch(InterruptedException e) {
866             LOG.warn("Interrupted eviction thread ", e);
867             Thread.currentThread().interrupt();
868           }
869         }
870         LruBlockCache cache = this.cache.get();
871         if (cache == null) break;
872         cache.evict();
873       }
874     }
875
876     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
877         justification="This is what we want")
878     public void evict() {
879       synchronized(this) {
880         this.notifyAll();
881       }
882     }
883
884     synchronized void shutdown() {
885       this.go = false;
886       this.notifyAll();
887     }
888
889     /**
890      * Used for the test.
891      */
892     boolean isEnteringRun() {
893       return this.enteringRun;
894     }
895   }
896
897   /*
898    * Statistics thread.  Periodically prints the cache statistics to the log.
899    */
900   static class StatisticsThread extends Thread {
901     private final LruBlockCache lru;
902
903     public StatisticsThread(LruBlockCache lru) {
904       super("LruBlockCacheStats");
905       setDaemon(true);
906       this.lru = lru;
907     }
908
909     @Override
910     public void run() {
911       lru.logStats();
912     }
913   }
914
915   public void logStats() {
916     // Log size
917     long totalSize = heapSize();
918     long freeSize = maxSize - totalSize;
919     LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
920         "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
921         "max=" + StringUtils.byteDesc(this.maxSize) + ", " +
922         "blockCount=" + getBlockCount() + ", " +
923         "accesses=" + stats.getRequestCount() + ", " +
924         "hits=" + stats.getHitCount() + ", " +
925         "hitRatio=" + (stats.getHitCount() == 0 ?
926           "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
927         "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
928         "cachingHits=" + stats.getHitCachingCount() + ", " +
929         "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ?
930           "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
931         "evictions=" + stats.getEvictionCount() + ", " +
932         "evicted=" + stats.getEvictedCount() + ", " +
933         "evictedPerRun=" + stats.evictedPerEviction());
934   }
935
936   /**
937    * Get counter statistics for this cache.
938    *
939    * <p>Includes: total accesses, hits, misses, evicted blocks, and runs
940    * of the eviction processes.
941    */
942   public CacheStats getStats() {
943     return this.stats;
944   }
945
946   public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
947       (4 * Bytes.SIZEOF_LONG) + (9 * ClassSize.REFERENCE) +
948       (6 * Bytes.SIZEOF_FLOAT) + (2 * Bytes.SIZEOF_BOOLEAN)
949       + ClassSize.OBJECT);
950
951   @Override
952   public long heapSize() {
953     return getCurrentSize();
954   }
955
956   public static long calculateOverhead(long maxSize, long blockSize, int concurrency){
957     // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG
958     return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP +
959         ((long)Math.ceil(maxSize*1.2/blockSize)
960             * ClassSize.CONCURRENT_HASHMAP_ENTRY) +
961         ((long)concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
962   }
963
964   @Override
965   public Iterator<CachedBlock> iterator() {
966     final Iterator<LruCachedBlock> iterator = map.values().iterator();
967
968     return new Iterator<CachedBlock>() {
969       private final long now = System.nanoTime();
970
971       @Override
972       public boolean hasNext() {
973         return iterator.hasNext();
974       }
975
976       @Override
977       public CachedBlock next() {
978         final LruCachedBlock b = iterator.next();
979         return new CachedBlock() {
980           @Override
981           public String toString() {
982             return BlockCacheUtil.toString(this, now);
983           }
984
985           @Override
986           public BlockPriority getBlockPriority() {
987             return b.getPriority();
988           }
989
990           @Override
991           public BlockType getBlockType() {
992             return b.getBuffer().getBlockType();
993           }
994
995           @Override
996           public long getOffset() {
997             return b.getCacheKey().getOffset();
998           }
999
1000           @Override
1001           public long getSize() {
1002             return b.getBuffer().heapSize();
1003           }
1004
1005           @Override
1006           public long getCachedTime() {
1007             return b.getCachedTime();
1008           }
1009
1010           @Override
1011           public String getFilename() {
1012             return b.getCacheKey().getHfileName();
1013           }
1014
1015           @Override
1016           public int compareTo(CachedBlock other) {
1017             int diff = this.getFilename().compareTo(other.getFilename());
1018             if (diff != 0) return diff;
1019             diff = Long.compare(this.getOffset(), other.getOffset());
1020             if (diff != 0) return diff;
1021             if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
1022               throw new IllegalStateException("" + this.getCachedTime() + ", " +
1023                 other.getCachedTime());
1024             }
1025             return Long.compare(other.getCachedTime(), this.getCachedTime());
1026           }
1027
1028           @Override
1029           public int hashCode() {
1030             return b.hashCode();
1031           }
1032
1033           @Override
1034           public boolean equals(Object obj) {
1035             if (obj instanceof CachedBlock) {
1036               CachedBlock cb = (CachedBlock)obj;
1037               return compareTo(cb) == 0;
1038             } else {
1039               return false;
1040             }
1041           }
1042         };
1043       }
1044
1045       @Override
1046       public void remove() {
1047         throw new UnsupportedOperationException();
1048       }
1049     };
1050   }
1051
1052   // Simple calculators of sizes given factors and maxSize
1053
1054   long acceptableSize() {
1055     return (long)Math.floor(this.maxSize * this.acceptableFactor);
1056   }
1057   private long minSize() {
1058     return (long)Math.floor(this.maxSize * this.minFactor);
1059   }
1060   private long singleSize() {
1061     return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor);
1062   }
1063   private long multiSize() {
1064     return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor);
1065   }
1066   private long memorySize() {
1067     return (long)Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
1068   }
1069
1070   public void shutdown() {
1071     if (victimHandler != null)
1072       victimHandler.shutdown();
1073     this.scheduleThreadPool.shutdown();
1074     for (int i = 0; i < 10; i++) {
1075       if (!this.scheduleThreadPool.isShutdown()) {
1076         try {
1077           Thread.sleep(10);
1078         } catch (InterruptedException e) {
1079           LOG.warn("Interrupted while sleeping");
1080           Thread.currentThread().interrupt();
1081           break;
1082         }
1083       }
1084     }
1085
1086     if (!this.scheduleThreadPool.isShutdown()) {
1087       List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
1088       LOG.debug("Still running " + runnables);
1089     }
1090     this.evictionThread.shutdown();
1091   }
1092
1093   /** Clears the cache. Used in tests. */
1094   @VisibleForTesting
1095   public void clearCache() {
1096     this.map.clear();
1097     this.elements.set(0);
1098   }
1099
1100   /**
1101    * Used in testing. May be very inefficient.
1102    * @return the set of cached file names
1103    */
1104   @VisibleForTesting
1105   SortedSet<String> getCachedFileNamesForTest() {
1106     SortedSet<String> fileNames = new TreeSet<String>();
1107     for (BlockCacheKey cacheKey : map.keySet()) {
1108       fileNames.add(cacheKey.getHfileName());
1109     }
1110     return fileNames;
1111   }
1112
1113   @VisibleForTesting
1114   Map<BlockType, Integer> getBlockTypeCountsForTest() {
1115     Map<BlockType, Integer> counts =
1116         new EnumMap<BlockType, Integer>(BlockType.class);
1117     for (LruCachedBlock cb : map.values()) {
1118       BlockType blockType = ((Cacheable)cb.getBuffer()).getBlockType();
1119       Integer count = counts.get(blockType);
1120       counts.put(blockType, (count == null ? 0 : count) + 1);
1121     }
1122     return counts;
1123   }
1124
1125   @VisibleForTesting
1126   public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
1127     Map<DataBlockEncoding, Integer> counts =
1128         new EnumMap<DataBlockEncoding, Integer>(DataBlockEncoding.class);
1129     for (LruCachedBlock block : map.values()) {
1130       DataBlockEncoding encoding =
1131               ((HFileBlock) block.getBuffer()).getDataBlockEncoding();
1132       Integer count = counts.get(encoding);
1133       counts.put(encoding, (count == null ? 0 : count) + 1);
1134     }
1135     return counts;
1136   }
1137
1138   public void setVictimCache(BlockCache handler) {
1139     assert victimHandler == null;
1140     victimHandler = handler;
1141   }
1142
1143   @VisibleForTesting
1144   Map<BlockCacheKey, LruCachedBlock> getMapForTests() {
1145     return map;
1146   }
1147
1148   BlockCache getVictimHandler() {
1149     return this.victimHandler;
1150   }
1151
1152   @Override
1153   public BlockCache[] getBlockCaches() {
1154     return null;
1155   }
1156
1157   @Override
1158   public void returnBlock(BlockCacheKey cacheKey, Cacheable block) {
1159     // There is no SHARED type here. Just return
1160   }
1161 }