View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.lang.ref.WeakReference;
22  import java.nio.ByteBuffer;
23  import java.util.EnumMap;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.PriorityQueue;
28  import java.util.SortedSet;
29  import java.util.TreeSet;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.Executors;
32  import java.util.concurrent.ScheduledExecutorService;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicLong;
35  import java.util.concurrent.locks.ReentrantLock;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.io.HeapSize;
42  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
43  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.hadoop.hbase.util.ClassSize;
46  import org.apache.hadoop.hbase.util.HasThread;
47  import org.apache.hadoop.util.StringUtils;
48  import org.codehaus.jackson.annotate.JsonIgnoreProperties;
49  
50  import com.google.common.annotations.VisibleForTesting;
51  import com.google.common.base.Objects;
52  import com.google.common.util.concurrent.ThreadFactoryBuilder;
53  
54  /**
55   * A block cache implementation that is memory-aware using {@link HeapSize},
56   * memory-bound using an LRU eviction algorithm, and concurrent: backed by a
57   * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving
58   * constant-time {@link #cacheBlock} and {@link #getBlock} operations.<p>
59   *
60   * Contains three levels of block priority to allow for scan-resistance and in-memory families 
61   * {@link org.apache.hadoop.hbase.HColumnDescriptor#setInMemory(boolean)} (An in-memory column 
62   * family is a column family that should be served from memory if possible):
63   * single-access, multiple-accesses, and in-memory priority.
64   * A block is added with an in-memory priority flag if
65   * {@link org.apache.hadoop.hbase.HColumnDescriptor#isInMemory()}, otherwise a block becomes a
66   *  single access priority the first time it is read into this block cache.  If a block is
67   *  accessed again while in cache, it is marked as a multiple access priority block.  This
68   *  delineation of blocks is used to prevent scans from thrashing the cache adding a 
69   *  least-frequently-used element to the eviction algorithm.<p>
70   *
71   * Each priority is given its own chunk of the total cache to ensure
72   * fairness during eviction.  Each priority will retain close to its maximum
73   * size, however, if any priority is not using its entire chunk the others
74   * are able to grow beyond their chunk size.<p>
75   *
76   * Instantiated at a minimum with the total size and average block size.
77   * All sizes are in bytes.  The block size is not especially important as this
78   * cache is fully dynamic in its sizing of blocks.  It is only used for
79   * pre-allocating data structures and in initial heap estimation of the map.<p>
80   *
81   * The detailed constructor defines the sizes for the three priorities (they
82   * should total to the <code>maximum size</code> defined).  It also sets the levels that
83   * trigger and control the eviction thread.<p>
84   *
85   * The <code>acceptable size</code> is the cache size level which triggers the eviction
86   * process to start.  It evicts enough blocks to get the size below the
87   * minimum size specified.<p>
88   *
89   * Eviction happens in a separate thread and involves a single full-scan
90   * of the map.  It determines how many bytes must be freed to reach the minimum
91   * size, and then while scanning determines the fewest least-recently-used
92   * blocks necessary from each of the three priorities (would be 3 times bytes
93   * to free).  It then uses the priority chunk sizes to evict fairly according
94   * to the relative sizes and usage.
95   */
96  @InterfaceAudience.Private
97  @JsonIgnoreProperties({"encodingCountsForTest"})
98  public class LruBlockCache implements ResizableBlockCache, HeapSize {
99  
100   private static final Log LOG = LogFactory.getLog(LruBlockCache.class);
101 
102   /**
103    * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep
104    * evicting during an eviction run till the cache size is down to 80% of the total.
105    */
106   static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";
107 
108   /**
109    * Acceptable size of cache (no evictions if size < acceptable)
110    */
111   static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.acceptable.factor";
112 
113   static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.single.percentage";
114   static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.multi.percentage";
115   static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.memory.percentage";
116 
117   /**
118    * Configuration key to force data-block always (except in-memory are too much)
119    * cached in memory for in-memory hfile, unlike inMemory, which is a column-family
120    * configuration, inMemoryForceMode is a cluster-wide configuration
121    */
122   static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME = "hbase.lru.rs.inmemoryforcemode";
123 
124   /** Default Configuration Parameters*/
125 
126   /** Backing Concurrent Map Configuration */
127   static final float DEFAULT_LOAD_FACTOR = 0.75f;
128   static final int DEFAULT_CONCURRENCY_LEVEL = 16;
129 
130   /** Eviction thresholds */
131   static final float DEFAULT_MIN_FACTOR = 0.95f;
132   static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f;
133 
134   /** Priority buckets */
135   static final float DEFAULT_SINGLE_FACTOR = 0.25f;
136   static final float DEFAULT_MULTI_FACTOR = 0.50f;
137   static final float DEFAULT_MEMORY_FACTOR = 0.25f;
138 
139   static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false;
140 
141   /** Statistics thread */
142   static final int statThreadPeriod = 60 * 5;
143   private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size";
144   private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
145 
146   /** Concurrent map (the cache) */
147   private final Map<BlockCacheKey,LruCachedBlock> map;
148 
149   /** Eviction lock (locked when eviction in process) */
150   private final ReentrantLock evictionLock = new ReentrantLock(true);
151   private final long maxBlockSize;
152 
153   /** Volatile boolean to track if we are in an eviction process or not */
154   private volatile boolean evictionInProgress = false;
155 
156   /** Eviction thread */
157   private final EvictionThread evictionThread;
158 
159   /** Statistics thread schedule pool (for heavy debugging, could remove) */
160   private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
161     new ThreadFactoryBuilder().setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build());
162 
163   /** Current size of cache */
164   private final AtomicLong size;
165 
166   /** Current number of cached elements */
167   private final AtomicLong elements;
168 
169   /** Cache access count (sequential ID) */
170   private final AtomicLong count;
171 
172   /** Cache statistics */
173   private final CacheStats stats;
174 
175   /** Maximum allowable size of cache (block put if size > max, evict) */
176   private long maxSize;
177 
178   /** Approximate block size */
179   private long blockSize;
180 
181   /** Acceptable size of cache (no evictions if size < acceptable) */
182   private float acceptableFactor;
183 
184   /** Minimum threshold of cache (when evicting, evict until size < min) */
185   private float minFactor;
186 
187   /** Single access bucket size */
188   private float singleFactor;
189 
190   /** Multiple access bucket size */
191   private float multiFactor;
192 
193   /** In-memory bucket size */
194   private float memoryFactor;
195 
196   /** Overhead of the structure itself */
197   private long overhead;
198 
199   /** Whether in-memory hfile's data block has higher priority when evicting */
200   private boolean forceInMemory;
201 
202   /** Where to send victims (blocks evicted/missing from the cache) */
203   private BlockCache victimHandler = null;
204 
205   /**
206    * Default constructor.  Specify maximum size and expected average block
207    * size (approximation is fine).
208    *
209    * <p>All other factors will be calculated based on defaults specified in
210    * this class.
211    * @param maxSize maximum size of cache, in bytes
212    * @param blockSize approximate size of each block, in bytes
213    */
214   public LruBlockCache(long maxSize, long blockSize) {
215     this(maxSize, blockSize, true);
216   }
217 
218   /**
219    * Constructor used for testing.  Allows disabling of the eviction thread.
220    */
221   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
222     this(maxSize, blockSize, evictionThread,
223         (int)Math.ceil(1.2*maxSize/blockSize),
224         DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
225         DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
226         DEFAULT_SINGLE_FACTOR,
227         DEFAULT_MULTI_FACTOR,
228         DEFAULT_MEMORY_FACTOR,
229         false,
230         DEFAULT_MAX_BLOCK_SIZE
231         );
232   }
233 
234   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
235     this(maxSize, blockSize, evictionThread,
236         (int)Math.ceil(1.2*maxSize/blockSize),
237         DEFAULT_LOAD_FACTOR,
238         DEFAULT_CONCURRENCY_LEVEL,
239         conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
240         conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR),
241         conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR),
242         conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR),
243         conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR),
244         conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE),
245         conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE)
246         );
247   }
248 
249   public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
250     this(maxSize, blockSize, true, conf);
251   }
252 
253   /**
254    * Configurable constructor.  Use this constructor if not using defaults.
255    * @param maxSize maximum size of this cache, in bytes
256    * @param blockSize expected average size of blocks, in bytes
257    * @param evictionThread whether to run evictions in a bg thread or not
258    * @param mapInitialSize initial size of backing ConcurrentHashMap
259    * @param mapLoadFactor initial load factor of backing ConcurrentHashMap
260    * @param mapConcurrencyLevel initial concurrency factor for backing CHM
261    * @param minFactor percentage of total size that eviction will evict until
262    * @param acceptableFactor percentage of total size that triggers eviction
263    * @param singleFactor percentage of total size for single-access blocks
264    * @param multiFactor percentage of total size for multiple-access blocks
265    * @param memoryFactor percentage of total size for in-memory blocks
266    */
267   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
268       int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
269       float minFactor, float acceptableFactor, float singleFactor,
270       float multiFactor, float memoryFactor, boolean forceInMemory, long maxBlockSize) {
271     this.maxBlockSize = maxBlockSize;
272     if(singleFactor + multiFactor + memoryFactor != 1 ||
273         singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) {
274       throw new IllegalArgumentException("Single, multi, and memory factors " +
275           " should be non-negative and total 1.0");
276     }
277     if(minFactor >= acceptableFactor) {
278       throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
279     }
280     if(minFactor >= 1.0f || acceptableFactor >= 1.0f) {
281       throw new IllegalArgumentException("all factors must be < 1");
282     }
283     this.maxSize = maxSize;
284     this.blockSize = blockSize;
285     this.forceInMemory = forceInMemory;
286     map = new ConcurrentHashMap<BlockCacheKey,LruCachedBlock>(mapInitialSize,
287         mapLoadFactor, mapConcurrencyLevel);
288     this.minFactor = minFactor;
289     this.acceptableFactor = acceptableFactor;
290     this.singleFactor = singleFactor;
291     this.multiFactor = multiFactor;
292     this.memoryFactor = memoryFactor;
293     this.stats = new CacheStats(this.getClass().getSimpleName());
294     this.count = new AtomicLong(0);
295     this.elements = new AtomicLong(0);
296     this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
297     this.size = new AtomicLong(this.overhead);
298     if(evictionThread) {
299       this.evictionThread = new EvictionThread(this);
300       this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
301     } else {
302       this.evictionThread = null;
303     }
304     // TODO: Add means of turning this off.  Bit obnoxious running thread just to make a log
305     // every five minutes.
306     this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
307         statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
308   }
309 
310   @Override
311   public void setMaxSize(long maxSize) {
312     this.maxSize = maxSize;
313     if(this.size.get() > acceptableSize() && !evictionInProgress) {
314       runEviction();
315     }
316   }
317 
318   // BlockCache implementation
319 
320   /**
321    * Cache the block with the specified name and buffer.
322    * <p>
323    * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)
324    * this can happen, for which we compare the buffer contents.
325    * @param cacheKey block's cache key
326    * @param buf block buffer
327    * @param inMemory if block is in-memory
328    * @param cacheDataInL1
329    */
330   @Override
331   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
332       final boolean cacheDataInL1) {
333 
334     if (buf.heapSize() > maxBlockSize) {
335       // If there are a lot of blocks that are too
336       // big this can make the logs way too noisy.
337       // So we log 2%
338       if (stats.failInsert() % 50 == 0) {
339         LOG.warn("Trying to cache too large a block "
340             + cacheKey.getHfileName() + " @ "
341             + cacheKey.getOffset()
342             + " is " + buf.heapSize()
343             + " which is larger than " + maxBlockSize);
344       }
345       return;
346     }
347 
348     LruCachedBlock cb = map.get(cacheKey);
349     if (cb != null) {
350       // compare the contents, if they are not equal, we are in big trouble
351       if (compare(buf, cb.getBuffer()) != 0) {
352         throw new RuntimeException("Cached block contents differ, which should not have happened."
353           + "cacheKey:" + cacheKey);
354       }
355       String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey();
356       msg += ". This is harmless and can happen in rare cases (see HBASE-8547)";
357       LOG.warn(msg);
358       return;
359     }
360     cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
361     long newSize = updateSizeMetrics(cb, false);
362     map.put(cacheKey, cb);
363     long val = elements.incrementAndGet();
364     if (LOG.isTraceEnabled()) {
365       long size = map.size();
366       assertCounterSanity(size, val);
367     }
368     if (newSize > acceptableSize() && !evictionInProgress) {
369       runEviction();
370     }
371   }
372 
373   /**
374    * Sanity-checking for parity between actual block cache content and metrics.
375    * Intended only for use with TRACE level logging and -ea JVM.
376    */
377   private static void assertCounterSanity(long mapSize, long counterVal) {
378     if (counterVal < 0) {
379       LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal +
380         ", mapSize=" + mapSize);
381       return;
382     }
383     if (mapSize < Integer.MAX_VALUE) {
384       double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.);
385       if (pct_diff > 0.05) {
386         LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal +
387           ", mapSize=" + mapSize);
388       }
389     }
390   }
391 
392   private int compare(Cacheable left, Cacheable right) {
393     ByteBuffer l = ByteBuffer.allocate(left.getSerializedLength());
394     left.serialize(l);
395     ByteBuffer r = ByteBuffer.allocate(right.getSerializedLength());
396     right.serialize(r);
397     return Bytes.compareTo(l.array(), l.arrayOffset(), l.limit(),
398       r.array(), r.arrayOffset(), r.limit());
399   }
400 
401   /**
402    * Cache the block with the specified name and buffer.
403    * <p>
404    * @param cacheKey block's cache key
405    * @param buf block buffer
406    */
407   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
408     cacheBlock(cacheKey, buf, false, false);
409   }
410 
411   /**
412    * Helper function that updates the local size counter and also updates any
413    * per-cf or per-blocktype metrics it can discern from given
414    * {@link LruCachedBlock}
415    *
416    * @param cb
417    * @param evict
418    */
419   protected long updateSizeMetrics(LruCachedBlock cb, boolean evict) {
420     long heapsize = cb.heapSize();
421     if (evict) {
422       heapsize *= -1;
423     }
424     return size.addAndGet(heapsize);
425   }
426 
427   /**
428    * Get the buffer of the block with the specified name.
429    * @param cacheKey block's cache key
430    * @param caching true if the caller caches blocks on cache misses
431    * @param repeat Whether this is a repeat lookup for the same block
432    *        (used to avoid double counting cache misses when doing double-check locking)
433    * @param updateCacheMetrics Whether to update cache metrics or not
434    * @return buffer of specified cache key, or null if not in cache
435    */
436   @Override
437   public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
438       boolean updateCacheMetrics) {
439     LruCachedBlock cb = map.get(cacheKey);
440     if (cb == null) {
441       if (!repeat && updateCacheMetrics) {
442         stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
443       }
444       // If there is another block cache then try and read there.
445       // However if this is a retry ( second time in double checked locking )
446       // And it's already a miss then the l2 will also be a miss.
447       if (victimHandler != null && !repeat) {
448         Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
449 
450         // Promote this to L1.
451         if (result != null && caching) {
452           cacheBlock(cacheKey, result, /* inMemory = */ false, /* cacheData = */ true);
453         }
454         return result;
455       }
456       return null;
457     }
458     if (updateCacheMetrics) stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
459     cb.access(count.incrementAndGet());
460     return cb.getBuffer();
461   }
462 
463   /**
464    * Whether the cache contains block with specified cacheKey
465    * @param cacheKey
466    * @return true if contains the block
467    */
468   public boolean containsBlock(BlockCacheKey cacheKey) {
469     return map.containsKey(cacheKey);
470   }
471 
472   @Override
473   public boolean evictBlock(BlockCacheKey cacheKey) {
474     LruCachedBlock cb = map.get(cacheKey);
475     if (cb == null) return false;
476     return evictBlock(cb, false) > 0;
477   }
478
479   /**
480    * Evicts all blocks for a specific HFile. This is an
481    * expensive operation implemented as a linear-time search through all blocks
482    * in the cache. Ideally this should be a search in a log-access-time map.
483    *
484    * <p>
485    * This is used for evict-on-close to remove all blocks of a specific HFile.
486    *
487    * @return the number of blocks evicted
488    */
489   @Override
490   public int evictBlocksByHfileName(String hfileName) {
491     int numEvicted = 0;
492     for (BlockCacheKey key : map.keySet()) {
493       if (key.getHfileName().equals(hfileName)) {
494         if (evictBlock(key))
495           ++numEvicted;
496       }
497     }
498     if (victimHandler != null) {
499       numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
500     }
501     return numEvicted;
502   }
503
504   /**
505    * Evict the block, and it will be cached by the victim handler if exists &amp;&amp;
506    * block may be read again later
507    * @param block
508    * @param evictedByEvictionProcess true if the given block is evicted by
509    *          EvictionThread
510    * @return the heap size of evicted block
511    */
512   protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) {
513     boolean found = map.remove(block.getCacheKey()) != null;
514     if (!found) {
515       return 0;
516     }
517     updateSizeMetrics(block, true);
518     long val = elements.decrementAndGet();
519     if (LOG.isTraceEnabled()) {
520       long size = map.size();
521       assertCounterSanity(size, val);
522     }
523     stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary());
524     if (evictedByEvictionProcess && victimHandler != null) {
525       if (victimHandler instanceof BucketCache) {
526         boolean wait = getCurrentSize() < acceptableSize();
527         boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
528         ((BucketCache)victimHandler).cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
529             inMemory, wait);
530       } else {
531         victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer());
532       }
533     }
534     return block.heapSize();
535   }
536
537   /**
538    * Multi-threaded call to run the eviction process.
539    */
540   private void runEviction() {
541     if(evictionThread == null) {
542       evict();
543     } else {
544       evictionThread.evict();
545     }
546   }
547
548   @VisibleForTesting
549   boolean isEvictionInProgress() {
550     return evictionInProgress;
551   }
552
553   @VisibleForTesting
554   long getOverhead() {
555     return overhead;
556   }
557
558   /**
559    * Eviction method.
560    */
561   void evict() {
562
563     // Ensure only one eviction at a time
564     if(!evictionLock.tryLock()) return;
565
566     try {
567       evictionInProgress = true;
568       long currentSize = this.size.get();
569       long bytesToFree = currentSize - minSize();
570
571       if (LOG.isTraceEnabled()) {
572         LOG.trace("Block cache LRU eviction started; Attempting to free " +
573           StringUtils.byteDesc(bytesToFree) + " of total=" +
574           StringUtils.byteDesc(currentSize));
575       }
576
577       if(bytesToFree <= 0) return;
578
579       // Instantiate priority buckets
580       BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize,
581           singleSize());
582       BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize,
583           multiSize());
584       BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize,
585           memorySize());
586
587       // Scan entire map putting into appropriate buckets
588       for(LruCachedBlock cachedBlock : map.values()) {
589         switch(cachedBlock.getPriority()) {
590           case SINGLE: {
591             bucketSingle.add(cachedBlock);
592             break;
593           }
594           case MULTI: {
595             bucketMulti.add(cachedBlock);
596             break;
597           }
598           case MEMORY: {
599             bucketMemory.add(cachedBlock);
600             break;
601           }
602         }
603       }
604
605       long bytesFreed = 0;
606       if (forceInMemory || memoryFactor > 0.999f) {
607         long s = bucketSingle.totalSize();
608         long m = bucketMulti.totalSize();
609         if (bytesToFree > (s + m)) {
610           // this means we need to evict blocks in memory bucket to make room,
611           // so the single and multi buckets will be emptied
612           bytesFreed = bucketSingle.free(s);
613           bytesFreed += bucketMulti.free(m);
614           if (LOG.isTraceEnabled()) {
615             LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
616               " from single and multi buckets");
617           }
618           bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
619           if (LOG.isTraceEnabled()) {
620             LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
621               " total from all three buckets ");
622           }
623         } else {
624           // this means no need to evict block in memory bucket,
625           // and we try best to make the ratio between single-bucket and
626           // multi-bucket is 1:2
627           long bytesRemain = s + m - bytesToFree;
628           if (3 * s <= bytesRemain) {
629             // single-bucket is small enough that no eviction happens for it
630             // hence all eviction goes from multi-bucket
631             bytesFreed = bucketMulti.free(bytesToFree);
632           } else if (3 * m <= 2 * bytesRemain) {
633             // multi-bucket is small enough that no eviction happens for it
634             // hence all eviction goes from single-bucket
635             bytesFreed = bucketSingle.free(bytesToFree);
636           } else {
637             // both buckets need to evict some blocks
638             bytesFreed = bucketSingle.free(s - bytesRemain / 3);
639             if (bytesFreed < bytesToFree) {
640               bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
641             }
642           }
643         }
644       } else {
645         PriorityQueue<BlockBucket> bucketQueue =
646           new PriorityQueue<BlockBucket>(3);
647
648         bucketQueue.add(bucketSingle);
649         bucketQueue.add(bucketMulti);
650         bucketQueue.add(bucketMemory);
651
652         int remainingBuckets = 3;
653 
654         BlockBucket bucket;
655         while((bucket = bucketQueue.poll()) != null) {
656           long overflow = bucket.overflow();
657           if(overflow > 0) {
658             long bucketBytesToFree = Math.min(overflow,
659                 (bytesToFree - bytesFreed) / remainingBuckets);
660             bytesFreed += bucket.free(bucketBytesToFree);
661           }
662           remainingBuckets--;
663         }
664       }
665       if (LOG.isTraceEnabled()) {
666         long single = bucketSingle.totalSize();
667         long multi = bucketMulti.totalSize();
668         long memory = bucketMemory.totalSize();
669         LOG.trace("Block cache LRU eviction completed; " +
670           "freed=" + StringUtils.byteDesc(bytesFreed) + ", " +
671           "total=" + StringUtils.byteDesc(this.size.get()) + ", " +
672           "single=" + StringUtils.byteDesc(single) + ", " +
673           "multi=" + StringUtils.byteDesc(multi) + ", " +
674           "memory=" + StringUtils.byteDesc(memory));
675       }
676     } finally {
677       stats.evict();
678       evictionInProgress = false;
679       evictionLock.unlock();
680     }
681   }
682
683   @Override
684   public String toString() {
685     return Objects.toStringHelper(this)
686       .add("blockCount", getBlockCount())
687       .add("currentSize", getCurrentSize())
688       .add("freeSize", getFreeSize())
689       .add("maxSize", getMaxSize())
690       .add("heapSize", heapSize())
691       .add("minSize", minSize())
692       .add("minFactor", minFactor)
693       .add("multiSize", multiSize())
694       .add("multiFactor", multiFactor)
695       .add("singleSize", singleSize())
696       .add("singleFactor", singleFactor)
697       .toString();
698   }
699
700   /**
701    * Used to group blocks into priority buckets.  There will be a BlockBucket
702    * for each priority (single, multi, memory).  Once bucketed, the eviction
703    * algorithm takes the appropriate number of elements out of each according
704    * to configuration parameters and their relatives sizes.
705    */
706   private class BlockBucket implements Comparable<BlockBucket> {
707
708     private final String name;
709     private LruCachedBlockQueue queue;
710     private long totalSize = 0;
711     private long bucketSize;
712
713     public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) {
714       this.name = name;
715       this.bucketSize = bucketSize;
716       queue = new LruCachedBlockQueue(bytesToFree, blockSize);
717       totalSize = 0;
718     }
719
720     public void add(LruCachedBlock block) {
721       totalSize += block.heapSize();
722       queue.add(block);
723     }
724
725     public long free(long toFree) {
726       if (LOG.isTraceEnabled()) {
727         LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this);
728       }
729       LruCachedBlock cb;
730       long freedBytes = 0;
731       while ((cb = queue.pollLast()) != null) {
732         freedBytes += evictBlock(cb, true);
733         if (freedBytes >= toFree) {
734           return freedBytes;
735         }
736       }
737       if (LOG.isTraceEnabled()) {
738         LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this);
739       }
740       return freedBytes;
741     }
742
743     public long overflow() {
744       return totalSize - bucketSize;
745     }
746
747     public long totalSize() {
748       return totalSize;
749     }
750
751     public int compareTo(BlockBucket that) {
752       return Long.compare(this.overflow(), that.overflow());
753     }
754
755     @Override
756     public boolean equals(Object that) {
757       if (that == null || !(that instanceof BlockBucket)){
758         return false;
759       }
760       return compareTo((BlockBucket)that) == 0;
761     }
762
763     @Override
764     public int hashCode() {
765       return Objects.hashCode(name, bucketSize, queue, totalSize);
766     }
767
768     @Override
769     public String toString() {
770       return Objects.toStringHelper(this)
771         .add("name", name)
772         .add("totalSize", StringUtils.byteDesc(totalSize))
773         .add("bucketSize", StringUtils.byteDesc(bucketSize))
774         .toString();
775     }
776   }
777
778   /**
779    * Get the maximum size of this cache.
780    * @return max size in bytes
781    */
782   public long getMaxSize() {
783     return this.maxSize;
784   }
785
786   @Override
787   public long getCurrentSize() {
788     return this.size.get();
789   }
790
791   @Override
792   public long getFreeSize() {
793     return getMaxSize() - getCurrentSize();
794   }
795
796   @Override
797   public long size() {
798     return getMaxSize();
799   }
800
801   @Override
802   public long getBlockCount() {
803     return this.elements.get();
804   }
805
806   EvictionThread getEvictionThread() {
807     return this.evictionThread;
808   }
809
810   /*
811    * Eviction thread.  Sits in waiting state until an eviction is triggered
812    * when the cache size grows above the acceptable level.<p>
813    *
814    * Thread is triggered into action by {@link LruBlockCache#runEviction()}
815    */
816   static class EvictionThread extends HasThread {
817     private WeakReference<LruBlockCache> cache;
818     private volatile boolean go = true;
819     // flag set after enter the run method, used for test
820     private boolean enteringRun = false;
821
822     public EvictionThread(LruBlockCache cache) {
823       super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread");
824       setDaemon(true);
825       this.cache = new WeakReference<LruBlockCache>(cache);
826     }
827
828     @Override
829     public void run() {
830       enteringRun = true;
831       while (this.go) {
832         synchronized(this) {
833           try {
834             this.wait(1000 * 10/*Don't wait for ever*/);
835           } catch(InterruptedException e) {
836             LOG.warn("Interrupted eviction thread ", e);
837             Thread.currentThread().interrupt();
838           }
839         }
840         LruBlockCache cache = this.cache.get();
841         if (cache == null) break;
842         cache.evict();
843       }
844     }
845
846     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
847         justification="This is what we want")
848     public void evict() {
849       synchronized(this) {
850         this.notifyAll();
851       }
852     }
853
854     synchronized void shutdown() {
855       this.go = false;
856       this.notifyAll();
857     }
858
859     /**
860      * Used for the test.
861      */
862     boolean isEnteringRun() {
863       return this.enteringRun;
864     }
865   }
866
867   /*
868    * Statistics thread.  Periodically prints the cache statistics to the log.
869    */
870   static class StatisticsThread extends Thread {
871     private final LruBlockCache lru;
872
873     public StatisticsThread(LruBlockCache lru) {
874       super("LruBlockCacheStats");
875       setDaemon(true);
876       this.lru = lru;
877     }
878
879     @Override
880     public void run() {
881       lru.logStats();
882     }
883   }
884
885   public void logStats() {
886     // Log size
887     long totalSize = heapSize();
888     long freeSize = maxSize - totalSize;
889     LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
890         "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
891         "max=" + StringUtils.byteDesc(this.maxSize) + ", " +
892         "blockCount=" + getBlockCount() + ", " +
893         "accesses=" + stats.getRequestCount() + ", " +
894         "hits=" + stats.getHitCount() + ", " +
895         "hitRatio=" + (stats.getHitCount() == 0 ?
896           "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
897         "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
898         "cachingHits=" + stats.getHitCachingCount() + ", " +
899         "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ?
900           "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
901         "evictions=" + stats.getEvictionCount() + ", " +
902         "evicted=" + stats.getEvictedCount() + ", " +
903         "evictedPerRun=" + stats.evictedPerEviction());
904   }
905
906   /**
907    * Get counter statistics for this cache.
908    *
909    * <p>Includes: total accesses, hits, misses, evicted blocks, and runs
910    * of the eviction processes.
911    */
912   public CacheStats getStats() {
913     return this.stats;
914   }
915
916   public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
917       (3 * Bytes.SIZEOF_LONG) + (10 * ClassSize.REFERENCE) +
918       (5 * Bytes.SIZEOF_FLOAT) + (2 * Bytes.SIZEOF_BOOLEAN)
919       + ClassSize.OBJECT);
920
921   @Override
922   public long heapSize() {
923     return getCurrentSize();
924   }
925
926   public static long calculateOverhead(long maxSize, long blockSize, int concurrency){
927     // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG
928     return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP +
929         ((long)Math.ceil(maxSize*1.2/blockSize)
930             * ClassSize.CONCURRENT_HASHMAP_ENTRY) +
931         ((long)concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
932   }
933
934   @Override
935   public Iterator<CachedBlock> iterator() {
936     final Iterator<LruCachedBlock> iterator = map.values().iterator();
937
938     return new Iterator<CachedBlock>() {
939       private final long now = System.nanoTime();
940
941       @Override
942       public boolean hasNext() {
943         return iterator.hasNext();
944       }
945
946       @Override
947       public CachedBlock next() {
948         final LruCachedBlock b = iterator.next();
949         return new CachedBlock() {
950           @Override
951           public String toString() {
952             return BlockCacheUtil.toString(this, now);
953           }
954
955           @Override
956           public BlockPriority getBlockPriority() {
957             return b.getPriority();
958           }
959
960           @Override
961           public BlockType getBlockType() {
962             return b.getBuffer().getBlockType();
963           }
964
965           @Override
966           public long getOffset() {
967             return b.getCacheKey().getOffset();
968           }
969
970           @Override
971           public long getSize() {
972             return b.getBuffer().heapSize();
973           }
974
975           @Override
976           public long getCachedTime() {
977             return b.getCachedTime();
978           }
979
980           @Override
981           public String getFilename() {
982             return b.getCacheKey().getHfileName();
983           }
984
985           @Override
986           public int compareTo(CachedBlock other) {
987             int diff = this.getFilename().compareTo(other.getFilename());
988             if (diff != 0) return diff;
989             diff = Long.compare(this.getOffset(), other.getOffset());
990             if (diff != 0) return diff;
991             if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
992               throw new IllegalStateException("" + this.getCachedTime() + ", " +
993                 other.getCachedTime());
994             }
995             return Long.compare(other.getCachedTime(), this.getCachedTime());
996           }
997
998           @Override
999           public int hashCode() {
1000             return b.hashCode();
1001           }
1002
1003           @Override
1004           public boolean equals(Object obj) {
1005             if (obj instanceof CachedBlock) {
1006               CachedBlock cb = (CachedBlock)obj;
1007               return compareTo(cb) == 0;
1008             } else {
1009               return false;
1010             }
1011           }
1012         };
1013       }
1014
1015       @Override
1016       public void remove() {
1017         throw new UnsupportedOperationException();
1018       }
1019     };
1020   }
1021
1022   // Simple calculators of sizes given factors and maxSize
1023
1024   long acceptableSize() {
1025     return (long)Math.floor(this.maxSize * this.acceptableFactor);
1026   }
1027   private long minSize() {
1028     return (long)Math.floor(this.maxSize * this.minFactor);
1029   }
1030   private long singleSize() {
1031     return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor);
1032   }
1033   private long multiSize() {
1034     return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor);
1035   }
1036   private long memorySize() {
1037     return (long)Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
1038   }
1039
1040   public void shutdown() {
1041     if (victimHandler != null)
1042       victimHandler.shutdown();
1043     this.scheduleThreadPool.shutdown();
1044     for (int i = 0; i < 10; i++) {
1045       if (!this.scheduleThreadPool.isShutdown()) {
1046         try {
1047           Thread.sleep(10);
1048         } catch (InterruptedException e) {
1049           LOG.warn("Interrupted while sleeping");
1050           Thread.currentThread().interrupt();
1051           break;
1052         }
1053       }
1054     }
1055
1056     if (!this.scheduleThreadPool.isShutdown()) {
1057       List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
1058       LOG.debug("Still running " + runnables);
1059     }
1060     this.evictionThread.shutdown();
1061   }
1062
1063   /** Clears the cache. Used in tests. */
1064   @VisibleForTesting
1065   public void clearCache() {
1066     this.map.clear();
1067     this.elements.set(0);
1068   }
1069
1070   /**
1071    * Used in testing. May be very inefficient.
1072    * @return the set of cached file names
1073    */
1074   @VisibleForTesting
1075   SortedSet<String> getCachedFileNamesForTest() {
1076     SortedSet<String> fileNames = new TreeSet<String>();
1077     for (BlockCacheKey cacheKey : map.keySet()) {
1078       fileNames.add(cacheKey.getHfileName());
1079     }
1080     return fileNames;
1081   }
1082
1083   @VisibleForTesting
1084   Map<BlockType, Integer> getBlockTypeCountsForTest() {
1085     Map<BlockType, Integer> counts =
1086         new EnumMap<BlockType, Integer>(BlockType.class);
1087     for (LruCachedBlock cb : map.values()) {
1088       BlockType blockType = ((Cacheable)cb.getBuffer()).getBlockType();
1089       Integer count = counts.get(blockType);
1090       counts.put(blockType, (count == null ? 0 : count) + 1);
1091     }
1092     return counts;
1093   }
1094
1095   @VisibleForTesting
1096   public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
1097     Map<DataBlockEncoding, Integer> counts =
1098         new EnumMap<DataBlockEncoding, Integer>(DataBlockEncoding.class);
1099     for (LruCachedBlock block : map.values()) {
1100       DataBlockEncoding encoding =
1101               ((HFileBlock) block.getBuffer()).getDataBlockEncoding();
1102       Integer count = counts.get(encoding);
1103       counts.put(encoding, (count == null ? 0 : count) + 1);
1104     }
1105     return counts;
1106   }
1107
1108   public void setVictimCache(BlockCache handler) {
1109     assert victimHandler == null;
1110     victimHandler = handler;
1111   }
1112
1113   @VisibleForTesting
1114   Map<BlockCacheKey, LruCachedBlock> getMapForTests() {
1115     return map;
1116   }
1117
1118   BlockCache getVictimHandler() {
1119     return this.victimHandler;
1120   }
1121
1122   @Override
1123   public BlockCache[] getBlockCaches() {
1124     return null;
1125   }
1126
1127   @Override
1128   public void returnBlock(BlockCacheKey cacheKey, Cacheable block) {
1129     // There is no SHARED type here. Just return
1130   }
1131 }