001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import java.lang.ref.WeakReference;
021import java.util.EnumMap;
022import java.util.Iterator;
023import java.util.List;
024import java.util.Map;
025import java.util.PriorityQueue;
026import java.util.SortedSet;
027import java.util.TreeSet;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.Executors;
030import java.util.concurrent.ScheduledExecutorService;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.atomic.AtomicLong;
033import java.util.concurrent.atomic.LongAdder;
034import java.util.concurrent.locks.ReentrantLock;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.io.HeapSize;
037import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.ClassSize;
040import org.apache.hadoop.hbase.util.HasThread;
041import org.apache.hadoop.util.StringUtils;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
047import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
048import org.apache.hbase.thirdparty.com.google.common.base.Objects;
049import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
050
051/**
052 * A block cache implementation that is memory-aware using {@link HeapSize},
053 * memory-bound using an LRU eviction algorithm, and concurrent: backed by a
054 * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving
055 * constant-time {@link #cacheBlock} and {@link #getBlock} operations.<p>
056 *
057 * Contains three levels of block priority to allow for scan-resistance and in-memory families
058 * {@link org.apache.hadoop.hbase.HColumnDescriptor#setInMemory(boolean)} (An in-memory column
059 * family is a column family that should be served from memory if possible):
060 * single-access, multiple-accesses, and in-memory priority.
061 * A block is added with an in-memory priority flag if
062 * {@link org.apache.hadoop.hbase.HColumnDescriptor#isInMemory()}, otherwise a block becomes a
063 * single access priority the first time it is read into this block cache.  If a block is
064 * accessed again while in cache, it is marked as a multiple access priority block.  This
065 * delineation of blocks is used to prevent scans from thrashing the cache adding a
066 * least-frequently-used element to the eviction algorithm.<p>
067 *
068 * Each priority is given its own chunk of the total cache to ensure
069 * fairness during eviction.  Each priority will retain close to its maximum
070 * size, however, if any priority is not using its entire chunk the others
071 * are able to grow beyond their chunk size.<p>
072 *
073 * Instantiated at a minimum with the total size and average block size.
074 * All sizes are in bytes.  The block size is not especially important as this
075 * cache is fully dynamic in its sizing of blocks.  It is only used for
076 * pre-allocating data structures and in initial heap estimation of the map.<p>
077 *
078 * The detailed constructor defines the sizes for the three priorities (they
079 * should total to the <code>maximum size</code> defined).  It also sets the levels that
080 * trigger and control the eviction thread.<p>
081 *
082 * The <code>acceptable size</code> is the cache size level which triggers the eviction
083 * process to start.  It evicts enough blocks to get the size below the
084 * minimum size specified.<p>
085 *
086 * Eviction happens in a separate thread and involves a single full-scan
087 * of the map.  It determines how many bytes must be freed to reach the minimum
088 * size, and then while scanning determines the fewest least-recently-used
089 * blocks necessary from each of the three priorities (would be 3 times bytes
090 * to free).  It then uses the priority chunk sizes to evict fairly according
091 * to the relative sizes and usage.
092 */
093@InterfaceAudience.Private
094public class LruBlockCache implements ResizableBlockCache, HeapSize {
095
096  private static final Logger LOG = LoggerFactory.getLogger(LruBlockCache.class);
097
098  /**
099   * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep
100   * evicting during an eviction run till the cache size is down to 80% of the total.
101   */
102  private static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";
103
104  /**
105   * Acceptable size of cache (no evictions if size < acceptable)
106   */
107  private static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME =
108      "hbase.lru.blockcache.acceptable.factor";
109
110  /**
111   * Hard capacity limit of cache, will reject any put if size > this * acceptable
112   */
113  static final String LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME =
114      "hbase.lru.blockcache.hard.capacity.limit.factor";
115  private static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME =
116      "hbase.lru.blockcache.single.percentage";
117  private static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME =
118      "hbase.lru.blockcache.multi.percentage";
119  private static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME =
120      "hbase.lru.blockcache.memory.percentage";
121
122  /**
123   * Configuration key to force data-block always (except in-memory are too much)
124   * cached in memory for in-memory hfile, unlike inMemory, which is a column-family
125   * configuration, inMemoryForceMode is a cluster-wide configuration
126   */
127  private static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME =
128      "hbase.lru.rs.inmemoryforcemode";
129
130  /* Default Configuration Parameters*/
131
132  /* Backing Concurrent Map Configuration */
133  static final float DEFAULT_LOAD_FACTOR = 0.75f;
134  static final int DEFAULT_CONCURRENCY_LEVEL = 16;
135
136  /* Eviction thresholds */
137  private static final float DEFAULT_MIN_FACTOR = 0.95f;
138  static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f;
139
140  /* Priority buckets */
141  private static final float DEFAULT_SINGLE_FACTOR = 0.25f;
142  private static final float DEFAULT_MULTI_FACTOR = 0.50f;
143  private static final float DEFAULT_MEMORY_FACTOR = 0.25f;
144
145  private static final float DEFAULT_HARD_CAPACITY_LIMIT_FACTOR = 1.2f;
146
147  private static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false;
148
149  /* Statistics thread */
150  private static final int STAT_THREAD_PERIOD = 60 * 5;
151  private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size";
152  private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
153
154  /** Concurrent map (the cache) */
155  private transient final Map<BlockCacheKey, LruCachedBlock> map;
156
157  /** Eviction lock (locked when eviction in process) */
158  private transient final ReentrantLock evictionLock = new ReentrantLock(true);
159
160  private final long maxBlockSize;
161
162  /** Volatile boolean to track if we are in an eviction process or not */
163  private volatile boolean evictionInProgress = false;
164
165  /** Eviction thread */
166  private transient final EvictionThread evictionThread;
167
168  /** Statistics thread schedule pool (for heavy debugging, could remove) */
169  private transient final ScheduledExecutorService scheduleThreadPool =
170    Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
171      .setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build());
172
173  /** Current size of cache */
174  private final AtomicLong size;
175
176  /** Current size of data blocks */
177  private final LongAdder dataBlockSize;
178
179  /** Current number of cached elements */
180  private final AtomicLong elements;
181
182  /** Current number of cached data block elements */
183  private final LongAdder dataBlockElements;
184
185  /** Cache access count (sequential ID) */
186  private final AtomicLong count;
187
188  /** hard capacity limit */
189  private float hardCapacityLimitFactor;
190
191  /** Cache statistics */
192  private final CacheStats stats;
193
194  /** Maximum allowable size of cache (block put if size > max, evict) */
195  private long maxSize;
196
197  /** Approximate block size */
198  private long blockSize;
199
200  /** Acceptable size of cache (no evictions if size < acceptable) */
201  private float acceptableFactor;
202
203  /** Minimum threshold of cache (when evicting, evict until size < min) */
204  private float minFactor;
205
206  /** Single access bucket size */
207  private float singleFactor;
208
209  /** Multiple access bucket size */
210  private float multiFactor;
211
212  /** In-memory bucket size */
213  private float memoryFactor;
214
215  /** Overhead of the structure itself */
216  private long overhead;
217
218  /** Whether in-memory hfile's data block has higher priority when evicting */
219  private boolean forceInMemory;
220
221  /**
222   * Where to send victims (blocks evicted/missing from the cache). This is used only when we use an
223   * external cache as L2.
224   * Note: See org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache
225   */
226  private transient BlockCache victimHandler = null;
227
228  /**
229   * Default constructor.  Specify maximum size and expected average block
230   * size (approximation is fine).
231   *
232   * <p>All other factors will be calculated based on defaults specified in
233   * this class.
234   *
235   * @param maxSize   maximum size of cache, in bytes
236   * @param blockSize approximate size of each block, in bytes
237   */
238  public LruBlockCache(long maxSize, long blockSize) {
239    this(maxSize, blockSize, true);
240  }
241
242  /**
243   * Constructor used for testing.  Allows disabling of the eviction thread.
244   */
245  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
246    this(maxSize, blockSize, evictionThread,
247        (int) Math.ceil(1.2 * maxSize / blockSize),
248        DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
249        DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
250        DEFAULT_SINGLE_FACTOR,
251        DEFAULT_MULTI_FACTOR,
252        DEFAULT_MEMORY_FACTOR,
253        DEFAULT_HARD_CAPACITY_LIMIT_FACTOR,
254        false,
255        DEFAULT_MAX_BLOCK_SIZE
256        );
257  }
258
259  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
260    this(maxSize, blockSize, evictionThread,
261        (int) Math.ceil(1.2 * maxSize / blockSize),
262        DEFAULT_LOAD_FACTOR,
263        DEFAULT_CONCURRENCY_LEVEL,
264        conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
265        conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR),
266        conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR),
267        conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR),
268        conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR),
269        conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME,
270                      DEFAULT_HARD_CAPACITY_LIMIT_FACTOR),
271        conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE),
272        conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE)
273    );
274  }
275
276  public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
277    this(maxSize, blockSize, true, conf);
278  }
279
280  /**
281   * Configurable constructor.  Use this constructor if not using defaults.
282   *
283   * @param maxSize             maximum size of this cache, in bytes
284   * @param blockSize           expected average size of blocks, in bytes
285   * @param evictionThread      whether to run evictions in a bg thread or not
286   * @param mapInitialSize      initial size of backing ConcurrentHashMap
287   * @param mapLoadFactor       initial load factor of backing ConcurrentHashMap
288   * @param mapConcurrencyLevel initial concurrency factor for backing CHM
289   * @param minFactor           percentage of total size that eviction will evict until
290   * @param acceptableFactor    percentage of total size that triggers eviction
291   * @param singleFactor        percentage of total size for single-access blocks
292   * @param multiFactor         percentage of total size for multiple-access blocks
293   * @param memoryFactor        percentage of total size for in-memory blocks
294   */
295  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
296      int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
297      float minFactor, float acceptableFactor, float singleFactor,
298      float multiFactor, float memoryFactor, float hardLimitFactor,
299      boolean forceInMemory, long maxBlockSize) {
300    this.maxBlockSize = maxBlockSize;
301    if(singleFactor + multiFactor + memoryFactor != 1 ||
302        singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) {
303      throw new IllegalArgumentException("Single, multi, and memory factors " +
304          " should be non-negative and total 1.0");
305    }
306    if (minFactor >= acceptableFactor) {
307      throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
308    }
309    if (minFactor >= 1.0f || acceptableFactor >= 1.0f) {
310      throw new IllegalArgumentException("all factors must be < 1");
311    }
312    this.maxSize = maxSize;
313    this.blockSize = blockSize;
314    this.forceInMemory = forceInMemory;
315    map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel);
316    this.minFactor = minFactor;
317    this.acceptableFactor = acceptableFactor;
318    this.singleFactor = singleFactor;
319    this.multiFactor = multiFactor;
320    this.memoryFactor = memoryFactor;
321    this.stats = new CacheStats(this.getClass().getSimpleName());
322    this.count = new AtomicLong(0);
323    this.elements = new AtomicLong(0);
324    this.dataBlockElements = new LongAdder();
325    this.dataBlockSize = new LongAdder();
326    this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
327    this.size = new AtomicLong(this.overhead);
328    this.hardCapacityLimitFactor = hardLimitFactor;
329    if (evictionThread) {
330      this.evictionThread = new EvictionThread(this);
331      this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
332    } else {
333      this.evictionThread = null;
334    }
335    // TODO: Add means of turning this off.  Bit obnoxious running thread just to make a log
336    // every five minutes.
337    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD,
338                                                STAT_THREAD_PERIOD, TimeUnit.SECONDS);
339  }
340
341  @Override
342  public void setMaxSize(long maxSize) {
343    this.maxSize = maxSize;
344    if (this.size.get() > acceptableSize() && !evictionInProgress) {
345      runEviction();
346    }
347  }
348
349  // BlockCache implementation
350
351  /**
352   * Cache the block with the specified name and buffer.
353   * <p>
354   * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)
355   * this can happen, for which we compare the buffer contents.
356   *
357   * @param cacheKey block's cache key
358   * @param buf      block buffer
359   * @param inMemory if block is in-memory
360   */
361  @Override
362  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
363    if (buf.heapSize() > maxBlockSize) {
364      // If there are a lot of blocks that are too
365      // big this can make the logs way too noisy.
366      // So we log 2%
367      if (stats.failInsert() % 50 == 0) {
368        LOG.warn("Trying to cache too large a block "
369            + cacheKey.getHfileName() + " @ "
370            + cacheKey.getOffset()
371            + " is " + buf.heapSize()
372            + " which is larger than " + maxBlockSize);
373      }
374      return;
375    }
376
377    LruCachedBlock cb = map.get(cacheKey);
378    if (cb != null && !BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, buf)) {
379      return;
380    }
381    long currentSize = size.get();
382    long currentAcceptableSize = acceptableSize();
383    long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize);
384    if (currentSize >= hardLimitSize) {
385      stats.failInsert();
386      if (LOG.isTraceEnabled()) {
387        LOG.trace("LruBlockCache current size " + StringUtils.byteDesc(currentSize)
388          + " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + "."
389          + " The hard limit size is " + StringUtils.byteDesc(hardLimitSize)
390          + ", failed to put cacheKey:" + cacheKey + " into LruBlockCache.");
391      }
392      if (!evictionInProgress) {
393        runEviction();
394      }
395      return;
396    }
397    cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
398    long newSize = updateSizeMetrics(cb, false);
399    map.put(cacheKey, cb);
400    long val = elements.incrementAndGet();
401    if (buf.getBlockType().isData()) {
402       dataBlockElements.increment();
403    }
404    if (LOG.isTraceEnabled()) {
405      long size = map.size();
406      assertCounterSanity(size, val);
407    }
408    if (newSize > currentAcceptableSize && !evictionInProgress) {
409      runEviction();
410    }
411  }
412
413  /**
414   * Sanity-checking for parity between actual block cache content and metrics.
415   * Intended only for use with TRACE level logging and -ea JVM.
416   */
417  private static void assertCounterSanity(long mapSize, long counterVal) {
418    if (counterVal < 0) {
419      LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal +
420        ", mapSize=" + mapSize);
421      return;
422    }
423    if (mapSize < Integer.MAX_VALUE) {
424      double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.);
425      if (pct_diff > 0.05) {
426        LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal +
427          ", mapSize=" + mapSize);
428      }
429    }
430  }
431
432  /**
433   * Cache the block with the specified name and buffer.
434   * <p>
435   *
436   * @param cacheKey block's cache key
437   * @param buf      block buffer
438   */
439  @Override
440  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
441    cacheBlock(cacheKey, buf, false);
442  }
443
444  /**
445   * Helper function that updates the local size counter and also updates any
446   * per-cf or per-blocktype metrics it can discern from given
447   * {@link LruCachedBlock}
448   */
449  private long updateSizeMetrics(LruCachedBlock cb, boolean evict) {
450    long heapsize = cb.heapSize();
451    BlockType bt = cb.getBuffer().getBlockType();
452    if (evict) {
453      heapsize *= -1;
454    }
455    if (bt != null && bt.isData()) {
456       dataBlockSize.add(heapsize);
457    }
458    return size.addAndGet(heapsize);
459  }
460
461  /**
462   * Get the buffer of the block with the specified name.
463   *
464   * @param cacheKey           block's cache key
465   * @param caching            true if the caller caches blocks on cache misses
466   * @param repeat             Whether this is a repeat lookup for the same block
467   *                           (used to avoid double counting cache misses when doing double-check
468   *                           locking)
469   * @param updateCacheMetrics Whether to update cache metrics or not
470   *
471   * @return buffer of specified cache key, or null if not in cache
472   */
473  @Override
474  public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
475      boolean updateCacheMetrics) {
476    LruCachedBlock cb = map.get(cacheKey);
477    if (cb == null) {
478      if (!repeat && updateCacheMetrics) {
479        stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
480      }
481      // If there is another block cache then try and read there.
482      // However if this is a retry ( second time in double checked locking )
483      // And it's already a miss then the l2 will also be a miss.
484      if (victimHandler != null && !repeat) {
485        Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
486
487        // Promote this to L1.
488        if (result != null && caching) {
489          if (result instanceof HFileBlock && ((HFileBlock) result).usesSharedMemory()) {
490            result = ((HFileBlock) result).deepClone();
491          }
492          cacheBlock(cacheKey, result, /* inMemory = */ false);
493        }
494        return result;
495      }
496      return null;
497    }
498    if (updateCacheMetrics) stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
499    cb.access(count.incrementAndGet());
500    return cb.getBuffer();
501  }
502
503  /**
504   * Whether the cache contains block with specified cacheKey
505   *
506   * @return true if contains the block
507   */
508  public boolean containsBlock(BlockCacheKey cacheKey) {
509    return map.containsKey(cacheKey);
510  }
511
512  @Override
513  public boolean evictBlock(BlockCacheKey cacheKey) {
514    LruCachedBlock cb = map.get(cacheKey);
515    return cb != null && evictBlock(cb, false) > 0;
516  }
517
518  /**
519   * Evicts all blocks for a specific HFile. This is an
520   * expensive operation implemented as a linear-time search through all blocks
521   * in the cache. Ideally this should be a search in a log-access-time map.
522   *
523   * <p>
524   * This is used for evict-on-close to remove all blocks of a specific HFile.
525   *
526   * @return the number of blocks evicted
527   */
528  @Override
529  public int evictBlocksByHfileName(String hfileName) {
530    int numEvicted = 0;
531    for (BlockCacheKey key : map.keySet()) {
532      if (key.getHfileName().equals(hfileName)) {
533        if (evictBlock(key))
534          ++numEvicted;
535      }
536    }
537    if (victimHandler != null) {
538      numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
539    }
540    return numEvicted;
541  }
542
543  /**
544   * Evict the block, and it will be cached by the victim handler if exists &amp;&amp;
545   * block may be read again later
546   *
547   * @param evictedByEvictionProcess true if the given block is evicted by
548   *          EvictionThread
549   * @return the heap size of evicted block
550   */
551  protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) {
552    boolean found = map.remove(block.getCacheKey()) != null;
553    if (!found) {
554      return 0;
555    }
556    updateSizeMetrics(block, true);
557    long val = elements.decrementAndGet();
558    if (LOG.isTraceEnabled()) {
559      long size = map.size();
560      assertCounterSanity(size, val);
561    }
562    if (block.getBuffer().getBlockType().isData()) {
563       dataBlockElements.decrement();
564    }
565    if (evictedByEvictionProcess) {
566      // When the eviction of the block happened because of invalidation of HFiles, no need to
567      // update the stats counter.
568      stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary());
569      if (victimHandler != null) {
570        victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer());
571      }
572    }
573    return block.heapSize();
574  }
575
576  /**
577   * Multi-threaded call to run the eviction process.
578   */
579  private void runEviction() {
580    if (evictionThread == null) {
581      evict();
582    } else {
583      evictionThread.evict();
584    }
585  }
586
587  @VisibleForTesting
588  boolean isEvictionInProgress() {
589    return evictionInProgress;
590  }
591
592  @VisibleForTesting
593  long getOverhead() {
594    return overhead;
595  }
596
597  /**
598   * Eviction method.
599   */
600  void evict() {
601
602    // Ensure only one eviction at a time
603    if(!evictionLock.tryLock()) return;
604
605    try {
606      evictionInProgress = true;
607      long currentSize = this.size.get();
608      long bytesToFree = currentSize - minSize();
609
610      if (LOG.isTraceEnabled()) {
611        LOG.trace("Block cache LRU eviction started; Attempting to free " +
612          StringUtils.byteDesc(bytesToFree) + " of total=" +
613          StringUtils.byteDesc(currentSize));
614      }
615
616      if (bytesToFree <= 0) return;
617
618      // Instantiate priority buckets
619      BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize, singleSize());
620      BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize, multiSize());
621      BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize, memorySize());
622
623      // Scan entire map putting into appropriate buckets
624      for (LruCachedBlock cachedBlock : map.values()) {
625        switch (cachedBlock.getPriority()) {
626          case SINGLE: {
627            bucketSingle.add(cachedBlock);
628            break;
629          }
630          case MULTI: {
631            bucketMulti.add(cachedBlock);
632            break;
633          }
634          case MEMORY: {
635            bucketMemory.add(cachedBlock);
636            break;
637          }
638        }
639      }
640
641      long bytesFreed = 0;
642      if (forceInMemory || memoryFactor > 0.999f) {
643        long s = bucketSingle.totalSize();
644        long m = bucketMulti.totalSize();
645        if (bytesToFree > (s + m)) {
646          // this means we need to evict blocks in memory bucket to make room,
647          // so the single and multi buckets will be emptied
648          bytesFreed = bucketSingle.free(s);
649          bytesFreed += bucketMulti.free(m);
650          if (LOG.isTraceEnabled()) {
651            LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
652              " from single and multi buckets");
653          }
654          bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
655          if (LOG.isTraceEnabled()) {
656            LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
657              " total from all three buckets ");
658          }
659        } else {
660          // this means no need to evict block in memory bucket,
661          // and we try best to make the ratio between single-bucket and
662          // multi-bucket is 1:2
663          long bytesRemain = s + m - bytesToFree;
664          if (3 * s <= bytesRemain) {
665            // single-bucket is small enough that no eviction happens for it
666            // hence all eviction goes from multi-bucket
667            bytesFreed = bucketMulti.free(bytesToFree);
668          } else if (3 * m <= 2 * bytesRemain) {
669            // multi-bucket is small enough that no eviction happens for it
670            // hence all eviction goes from single-bucket
671            bytesFreed = bucketSingle.free(bytesToFree);
672          } else {
673            // both buckets need to evict some blocks
674            bytesFreed = bucketSingle.free(s - bytesRemain / 3);
675            if (bytesFreed < bytesToFree) {
676              bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
677            }
678          }
679        }
680      } else {
681        PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3);
682
683        bucketQueue.add(bucketSingle);
684        bucketQueue.add(bucketMulti);
685        bucketQueue.add(bucketMemory);
686
687        int remainingBuckets = bucketQueue.size();
688
689        BlockBucket bucket;
690        while ((bucket = bucketQueue.poll()) != null) {
691          long overflow = bucket.overflow();
692          if (overflow > 0) {
693            long bucketBytesToFree =
694                Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets);
695            bytesFreed += bucket.free(bucketBytesToFree);
696          }
697          remainingBuckets--;
698        }
699      }
700      if (LOG.isTraceEnabled()) {
701        long single = bucketSingle.totalSize();
702        long multi = bucketMulti.totalSize();
703        long memory = bucketMemory.totalSize();
704        LOG.trace("Block cache LRU eviction completed; " +
705          "freed=" + StringUtils.byteDesc(bytesFreed) + ", " +
706          "total=" + StringUtils.byteDesc(this.size.get()) + ", " +
707          "single=" + StringUtils.byteDesc(single) + ", " +
708          "multi=" + StringUtils.byteDesc(multi) + ", " +
709          "memory=" + StringUtils.byteDesc(memory));
710      }
711    } finally {
712      stats.evict();
713      evictionInProgress = false;
714      evictionLock.unlock();
715    }
716  }
717
718  @Override
719  public String toString() {
720    return MoreObjects.toStringHelper(this)
721      .add("blockCount", getBlockCount())
722      .add("currentSize", StringUtils.byteDesc(getCurrentSize()))
723      .add("freeSize", StringUtils.byteDesc(getFreeSize()))
724      .add("maxSize", StringUtils.byteDesc(getMaxSize()))
725      .add("heapSize", StringUtils.byteDesc(heapSize()))
726      .add("minSize", StringUtils.byteDesc(minSize()))
727      .add("minFactor", minFactor)
728      .add("multiSize", StringUtils.byteDesc(multiSize()))
729      .add("multiFactor", multiFactor)
730      .add("singleSize", StringUtils.byteDesc(singleSize()))
731      .add("singleFactor", singleFactor)
732      .toString();
733  }
734
735  /**
736   * Used to group blocks into priority buckets.  There will be a BlockBucket
737   * for each priority (single, multi, memory).  Once bucketed, the eviction
738   * algorithm takes the appropriate number of elements out of each according
739   * to configuration parameters and their relatives sizes.
740   */
741  private class BlockBucket implements Comparable<BlockBucket> {
742
743    private final String name;
744    private LruCachedBlockQueue queue;
745    private long totalSize = 0;
746    private long bucketSize;
747
748    public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) {
749      this.name = name;
750      this.bucketSize = bucketSize;
751      queue = new LruCachedBlockQueue(bytesToFree, blockSize);
752      totalSize = 0;
753    }
754
755    public void add(LruCachedBlock block) {
756      totalSize += block.heapSize();
757      queue.add(block);
758    }
759
760    public long free(long toFree) {
761      if (LOG.isTraceEnabled()) {
762        LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this);
763      }
764      LruCachedBlock cb;
765      long freedBytes = 0;
766      while ((cb = queue.pollLast()) != null) {
767        freedBytes += evictBlock(cb, true);
768        if (freedBytes >= toFree) {
769          return freedBytes;
770        }
771      }
772      if (LOG.isTraceEnabled()) {
773        LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this);
774      }
775      return freedBytes;
776    }
777
778    public long overflow() {
779      return totalSize - bucketSize;
780    }
781
782    public long totalSize() {
783      return totalSize;
784    }
785
786    @Override
787    public int compareTo(BlockBucket that) {
788      return Long.compare(this.overflow(), that.overflow());
789    }
790
791    @Override
792    public boolean equals(Object that) {
793      if (that == null || !(that instanceof BlockBucket)) {
794        return false;
795      }
796      return compareTo((BlockBucket)that) == 0;
797    }
798
799    @Override
800    public int hashCode() {
801      return Objects.hashCode(name, bucketSize, queue, totalSize);
802    }
803
804    @Override
805    public String toString() {
806      return MoreObjects.toStringHelper(this)
807        .add("name", name)
808        .add("totalSize", StringUtils.byteDesc(totalSize))
809        .add("bucketSize", StringUtils.byteDesc(bucketSize))
810        .toString();
811    }
812  }
813
814  /**
815   * Get the maximum size of this cache.
816   *
817   * @return max size in bytes
818   */
819
820  @Override
821  public long getMaxSize() {
822    return this.maxSize;
823  }
824
825  @Override
826  public long getCurrentSize() {
827    return this.size.get();
828  }
829
830  @Override
831  public long getCurrentDataSize() {
832    return this.dataBlockSize.sum();
833  }
834
835  @Override
836  public long getFreeSize() {
837    return getMaxSize() - getCurrentSize();
838  }
839
840  @Override
841  public long size() {
842    return getMaxSize();
843  }
844
845  @Override
846  public long getBlockCount() {
847    return this.elements.get();
848  }
849
850  @Override
851  public long getDataBlockCount() {
852    return this.dataBlockElements.sum();
853  }
854
855  EvictionThread getEvictionThread() {
856    return this.evictionThread;
857  }
858
859  /*
860   * Eviction thread.  Sits in waiting state until an eviction is triggered
861   * when the cache size grows above the acceptable level.<p>
862   *
863   * Thread is triggered into action by {@link LruBlockCache#runEviction()}
864   */
865  static class EvictionThread extends HasThread {
866
867    private WeakReference<LruBlockCache> cache;
868    private volatile boolean go = true;
869    // flag set after enter the run method, used for test
870    private boolean enteringRun = false;
871
872    public EvictionThread(LruBlockCache cache) {
873      super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread");
874      setDaemon(true);
875      this.cache = new WeakReference<>(cache);
876    }
877
878    @Override
879    public void run() {
880      enteringRun = true;
881      while (this.go) {
882        synchronized (this) {
883          try {
884            this.wait(1000 * 10/*Don't wait for ever*/);
885          } catch (InterruptedException e) {
886            LOG.warn("Interrupted eviction thread ", e);
887            Thread.currentThread().interrupt();
888          }
889        }
890        LruBlockCache cache = this.cache.get();
891        if (cache == null) break;
892        cache.evict();
893      }
894    }
895
896    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
897        justification="This is what we want")
898    public void evict() {
899      synchronized (this) {
900        this.notifyAll();
901      }
902    }
903
904    synchronized void shutdown() {
905      this.go = false;
906      this.notifyAll();
907    }
908
909    /**
910     * Used for the test.
911     */
912    boolean isEnteringRun() {
913      return this.enteringRun;
914    }
915  }
916
917  /*
918   * Statistics thread.  Periodically prints the cache statistics to the log.
919   */
920  static class StatisticsThread extends Thread {
921
922    private final LruBlockCache lru;
923
924    public StatisticsThread(LruBlockCache lru) {
925      super("LruBlockCacheStats");
926      setDaemon(true);
927      this.lru = lru;
928    }
929
930    @Override
931    public void run() {
932      lru.logStats();
933    }
934  }
935
936  public void logStats() {
937    // Log size
938    long totalSize = heapSize();
939    long freeSize = maxSize - totalSize;
940    LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
941        "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
942        "max=" + StringUtils.byteDesc(this.maxSize) + ", " +
943        "blockCount=" + getBlockCount() + ", " +
944        "accesses=" + stats.getRequestCount() + ", " +
945        "hits=" + stats.getHitCount() + ", " +
946        "hitRatio=" + (stats.getHitCount() == 0 ?
947          "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
948        "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
949        "cachingHits=" + stats.getHitCachingCount() + ", " +
950        "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ?
951          "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
952        "evictions=" + stats.getEvictionCount() + ", " +
953        "evicted=" + stats.getEvictedCount() + ", " +
954        "evictedPerRun=" + stats.evictedPerEviction());
955  }
956
957  /**
958   * Get counter statistics for this cache.
959   *
960   * <p>Includes: total accesses, hits, misses, evicted blocks, and runs
961   * of the eviction processes.
962   */
963  @Override
964  public CacheStats getStats() {
965    return this.stats;
966  }
967
968  public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
969      (4 * Bytes.SIZEOF_LONG) + (11 * ClassSize.REFERENCE) +
970      (6 * Bytes.SIZEOF_FLOAT) + (2 * Bytes.SIZEOF_BOOLEAN)
971      + ClassSize.OBJECT);
972
973  @Override
974  public long heapSize() {
975    return getCurrentSize();
976  }
977
978  private static long calculateOverhead(long maxSize, long blockSize, int concurrency) {
979    // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG
980    return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP
981           + ((long) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY)
982           + ((long) concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
983  }
984
985  @Override
986  public Iterator<CachedBlock> iterator() {
987    final Iterator<LruCachedBlock> iterator = map.values().iterator();
988
989    return new Iterator<CachedBlock>() {
990      private final long now = System.nanoTime();
991
992      @Override
993      public boolean hasNext() {
994        return iterator.hasNext();
995      }
996
997      @Override
998      public CachedBlock next() {
999        final LruCachedBlock b = iterator.next();
1000        return new CachedBlock() {
1001          @Override
1002          public String toString() {
1003            return BlockCacheUtil.toString(this, now);
1004          }
1005
1006          @Override
1007          public BlockPriority getBlockPriority() {
1008            return b.getPriority();
1009          }
1010
1011          @Override
1012          public BlockType getBlockType() {
1013            return b.getBuffer().getBlockType();
1014          }
1015
1016          @Override
1017          public long getOffset() {
1018            return b.getCacheKey().getOffset();
1019          }
1020
1021          @Override
1022          public long getSize() {
1023            return b.getBuffer().heapSize();
1024          }
1025
1026          @Override
1027          public long getCachedTime() {
1028            return b.getCachedTime();
1029          }
1030
1031          @Override
1032          public String getFilename() {
1033            return b.getCacheKey().getHfileName();
1034          }
1035
1036          @Override
1037          public int compareTo(CachedBlock other) {
1038            int diff = this.getFilename().compareTo(other.getFilename());
1039            if (diff != 0) return diff;
1040            diff = Long.compare(this.getOffset(), other.getOffset());
1041            if (diff != 0) return diff;
1042            if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
1043              throw new IllegalStateException(this.getCachedTime() + ", " + other.getCachedTime());
1044            }
1045            return Long.compare(other.getCachedTime(), this.getCachedTime());
1046          }
1047
1048          @Override
1049          public int hashCode() {
1050            return b.hashCode();
1051          }
1052
1053          @Override
1054          public boolean equals(Object obj) {
1055            if (obj instanceof CachedBlock) {
1056              CachedBlock cb = (CachedBlock)obj;
1057              return compareTo(cb) == 0;
1058            } else {
1059              return false;
1060            }
1061          }
1062        };
1063      }
1064
1065      @Override
1066      public void remove() {
1067        throw new UnsupportedOperationException();
1068      }
1069    };
1070  }
1071
1072  // Simple calculators of sizes given factors and maxSize
1073
1074  long acceptableSize() {
1075    return (long)Math.floor(this.maxSize * this.acceptableFactor);
1076  }
1077  private long minSize() {
1078    return (long)Math.floor(this.maxSize * this.minFactor);
1079  }
1080  private long singleSize() {
1081    return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor);
1082  }
1083  private long multiSize() {
1084    return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor);
1085  }
1086  private long memorySize() {
1087    return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
1088  }
1089
1090  @Override
1091  public void shutdown() {
1092    if (victimHandler != null) {
1093      victimHandler.shutdown();
1094    }
1095    this.scheduleThreadPool.shutdown();
1096    for (int i = 0; i < 10; i++) {
1097      if (!this.scheduleThreadPool.isShutdown()) {
1098        try {
1099          Thread.sleep(10);
1100        } catch (InterruptedException e) {
1101          LOG.warn("Interrupted while sleeping");
1102          Thread.currentThread().interrupt();
1103          break;
1104        }
1105      }
1106    }
1107
1108    if (!this.scheduleThreadPool.isShutdown()) {
1109      List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
1110      LOG.debug("Still running " + runnables);
1111    }
1112    this.evictionThread.shutdown();
1113  }
1114
1115  /** Clears the cache. Used in tests. */
1116  @VisibleForTesting
1117  public void clearCache() {
1118    this.map.clear();
1119    this.elements.set(0);
1120  }
1121
1122  /**
1123   * Used in testing. May be very inefficient.
1124   *
1125   * @return the set of cached file names
1126   */
1127  @VisibleForTesting
1128  SortedSet<String> getCachedFileNamesForTest() {
1129    SortedSet<String> fileNames = new TreeSet<>();
1130    for (BlockCacheKey cacheKey : map.keySet()) {
1131      fileNames.add(cacheKey.getHfileName());
1132    }
1133    return fileNames;
1134  }
1135
1136  @VisibleForTesting
1137  Map<BlockType, Integer> getBlockTypeCountsForTest() {
1138    Map<BlockType, Integer> counts = new EnumMap<>(BlockType.class);
1139    for (LruCachedBlock cb : map.values()) {
1140      BlockType blockType = cb.getBuffer().getBlockType();
1141      Integer count = counts.get(blockType);
1142      counts.put(blockType, (count == null ? 0 : count) + 1);
1143    }
1144    return counts;
1145  }
1146
1147  @VisibleForTesting
1148  public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
1149    Map<DataBlockEncoding, Integer> counts = new EnumMap<>(DataBlockEncoding.class);
1150    for (LruCachedBlock block : map.values()) {
1151      DataBlockEncoding encoding = ((HFileBlock) block.getBuffer()).getDataBlockEncoding();
1152      Integer count = counts.get(encoding);
1153      counts.put(encoding, (count == null ? 0 : count) + 1);
1154    }
1155    return counts;
1156  }
1157
1158  public void setVictimCache(BlockCache handler) {
1159    assert victimHandler == null;
1160    victimHandler = handler;
1161  }
1162
1163  @VisibleForTesting
1164  Map<BlockCacheKey, LruCachedBlock> getMapForTests() {
1165    return map;
1166  }
1167
1168  @Override
1169  public BlockCache[] getBlockCaches() {
1170    if (victimHandler != null) {
1171      return new BlockCache[] { this, this.victimHandler };
1172    }
1173    return null;
1174  }
1175}