001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static java.util.Objects.requireNonNull;
021
022import java.lang.ref.WeakReference;
023import java.util.EnumMap;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027import java.util.PriorityQueue;
028import java.util.SortedSet;
029import java.util.TreeSet;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.Executors;
032import java.util.concurrent.ScheduledExecutorService;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicLong;
035import java.util.concurrent.atomic.LongAdder;
036import java.util.concurrent.locks.ReentrantLock;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.io.HeapSize;
039import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.util.ClassSize;
042import org.apache.hadoop.hbase.util.HasThread;
043import org.apache.hadoop.util.StringUtils;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
049import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
050import org.apache.hbase.thirdparty.com.google.common.base.Objects;
051import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
052
053/**
054 * A block cache implementation that is memory-aware using {@link HeapSize},
055 * memory-bound using an LRU eviction algorithm, and concurrent: backed by a
056 * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving
057 * constant-time {@link #cacheBlock} and {@link #getBlock} operations.<p>
058 *
059 * Contains three levels of block priority to allow for scan-resistance and in-memory families
060 * {@link org.apache.hadoop.hbase.HColumnDescriptor#setInMemory(boolean)} (An in-memory column
061 * family is a column family that should be served from memory if possible):
062 * single-access, multiple-accesses, and in-memory priority.
063 * A block is added with an in-memory priority flag if
064 * {@link org.apache.hadoop.hbase.HColumnDescriptor#isInMemory()}, otherwise a block becomes a
065 * single access priority the first time it is read into this block cache.  If a block is
066 * accessed again while in cache, it is marked as a multiple access priority block.  This
067 * delineation of blocks is used to prevent scans from thrashing the cache adding a
068 * least-frequently-used element to the eviction algorithm.<p>
069 *
070 * Each priority is given its own chunk of the total cache to ensure
071 * fairness during eviction.  Each priority will retain close to its maximum
072 * size, however, if any priority is not using its entire chunk the others
073 * are able to grow beyond their chunk size.<p>
074 *
075 * Instantiated at a minimum with the total size and average block size.
076 * All sizes are in bytes.  The block size is not especially important as this
077 * cache is fully dynamic in its sizing of blocks.  It is only used for
078 * pre-allocating data structures and in initial heap estimation of the map.<p>
079 *
080 * The detailed constructor defines the sizes for the three priorities (they
081 * should total to the <code>maximum size</code> defined).  It also sets the levels that
082 * trigger and control the eviction thread.<p>
083 *
084 * The <code>acceptable size</code> is the cache size level which triggers the eviction
085 * process to start.  It evicts enough blocks to get the size below the
086 * minimum size specified.<p>
087 *
088 * Eviction happens in a separate thread and involves a single full-scan
089 * of the map.  It determines how many bytes must be freed to reach the minimum
090 * size, and then while scanning determines the fewest least-recently-used
091 * blocks necessary from each of the three priorities (would be 3 times bytes
092 * to free).  It then uses the priority chunk sizes to evict fairly according
093 * to the relative sizes and usage.
094 */
095@InterfaceAudience.Private
096public class LruBlockCache implements FirstLevelBlockCache {
097
098  private static final Logger LOG = LoggerFactory.getLogger(LruBlockCache.class);
099
100  /**
101   * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep
102   * evicting during an eviction run till the cache size is down to 80% of the total.
103   */
104  private static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";
105
106  /**
107   * Acceptable size of cache (no evictions if size < acceptable)
108   */
109  private static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME =
110      "hbase.lru.blockcache.acceptable.factor";
111
112  /**
113   * Hard capacity limit of cache, will reject any put if size > this * acceptable
114   */
115  static final String LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME =
116      "hbase.lru.blockcache.hard.capacity.limit.factor";
117  private static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME =
118      "hbase.lru.blockcache.single.percentage";
119  private static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME =
120      "hbase.lru.blockcache.multi.percentage";
121  private static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME =
122      "hbase.lru.blockcache.memory.percentage";
123
124  /**
125   * Configuration key to force data-block always (except in-memory are too much)
126   * cached in memory for in-memory hfile, unlike inMemory, which is a column-family
127   * configuration, inMemoryForceMode is a cluster-wide configuration
128   */
129  private static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME =
130      "hbase.lru.rs.inmemoryforcemode";
131
132  /* Default Configuration Parameters*/
133
134  /* Backing Concurrent Map Configuration */
135  static final float DEFAULT_LOAD_FACTOR = 0.75f;
136  static final int DEFAULT_CONCURRENCY_LEVEL = 16;
137
138  /* Eviction thresholds */
139  private static final float DEFAULT_MIN_FACTOR = 0.95f;
140  static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f;
141
142  /* Priority buckets */
143  private static final float DEFAULT_SINGLE_FACTOR = 0.25f;
144  private static final float DEFAULT_MULTI_FACTOR = 0.50f;
145  private static final float DEFAULT_MEMORY_FACTOR = 0.25f;
146
147  private static final float DEFAULT_HARD_CAPACITY_LIMIT_FACTOR = 1.2f;
148
149  private static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false;
150
151  /* Statistics thread */
152  private static final int STAT_THREAD_PERIOD = 60 * 5;
153  private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size";
154  private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
155
156  /**
157   * Defined the cache map as {@link ConcurrentHashMap} here, because in
158   * {@link LruBlockCache#getBlock}, we need to guarantee the atomicity of map#computeIfPresent
159   * (key, func). Besides, the func method must execute exactly once only when the key is present
160   * and under the lock context, otherwise the reference count will be messed up. Notice that the
161   * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that.
162   */
163  private transient final ConcurrentHashMap<BlockCacheKey, LruCachedBlock> map;
164
165  /** Eviction lock (locked when eviction in process) */
166  private transient final ReentrantLock evictionLock = new ReentrantLock(true);
167
168  private final long maxBlockSize;
169
170  /** Volatile boolean to track if we are in an eviction process or not */
171  private volatile boolean evictionInProgress = false;
172
173  /** Eviction thread */
174  private transient final EvictionThread evictionThread;
175
176  /** Statistics thread schedule pool (for heavy debugging, could remove) */
177  private transient final ScheduledExecutorService scheduleThreadPool =
178    Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
179      .setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build());
180
181  /** Current size of cache */
182  private final AtomicLong size;
183
184  /** Current size of data blocks */
185  private final LongAdder dataBlockSize;
186
187  /** Current number of cached elements */
188  private final AtomicLong elements;
189
190  /** Current number of cached data block elements */
191  private final LongAdder dataBlockElements;
192
193  /** Cache access count (sequential ID) */
194  private final AtomicLong count;
195
196  /** hard capacity limit */
197  private float hardCapacityLimitFactor;
198
199  /** Cache statistics */
200  private final CacheStats stats;
201
202  /** Maximum allowable size of cache (block put if size > max, evict) */
203  private long maxSize;
204
205  /** Approximate block size */
206  private long blockSize;
207
208  /** Acceptable size of cache (no evictions if size < acceptable) */
209  private float acceptableFactor;
210
211  /** Minimum threshold of cache (when evicting, evict until size < min) */
212  private float minFactor;
213
214  /** Single access bucket size */
215  private float singleFactor;
216
217  /** Multiple access bucket size */
218  private float multiFactor;
219
220  /** In-memory bucket size */
221  private float memoryFactor;
222
223  /** Overhead of the structure itself */
224  private long overhead;
225
226  /** Whether in-memory hfile's data block has higher priority when evicting */
227  private boolean forceInMemory;
228
229  /**
230   * Where to send victims (blocks evicted/missing from the cache). This is used only when we use an
231   * external cache as L2.
232   * Note: See org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache
233   */
234  private transient BlockCache victimHandler = null;
235
236  /**
237   * Default constructor.  Specify maximum size and expected average block
238   * size (approximation is fine).
239   *
240   * <p>All other factors will be calculated based on defaults specified in
241   * this class.
242   *
243   * @param maxSize   maximum size of cache, in bytes
244   * @param blockSize approximate size of each block, in bytes
245   */
246  public LruBlockCache(long maxSize, long blockSize) {
247    this(maxSize, blockSize, true);
248  }
249
250  /**
251   * Constructor used for testing.  Allows disabling of the eviction thread.
252   */
253  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
254    this(maxSize, blockSize, evictionThread,
255        (int) Math.ceil(1.2 * maxSize / blockSize),
256        DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
257        DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
258        DEFAULT_SINGLE_FACTOR,
259        DEFAULT_MULTI_FACTOR,
260        DEFAULT_MEMORY_FACTOR,
261        DEFAULT_HARD_CAPACITY_LIMIT_FACTOR,
262        false,
263        DEFAULT_MAX_BLOCK_SIZE);
264  }
265
266  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
267    this(maxSize, blockSize, evictionThread,
268        (int) Math.ceil(1.2 * maxSize / blockSize),
269        DEFAULT_LOAD_FACTOR,
270        DEFAULT_CONCURRENCY_LEVEL,
271        conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
272        conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR),
273        conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR),
274        conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR),
275        conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR),
276        conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME,
277                      DEFAULT_HARD_CAPACITY_LIMIT_FACTOR),
278        conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE),
279        conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE));
280  }
281
282  public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
283    this(maxSize, blockSize, true, conf);
284  }
285
286  /**
287   * Configurable constructor.  Use this constructor if not using defaults.
288   *
289   * @param maxSize             maximum size of this cache, in bytes
290   * @param blockSize           expected average size of blocks, in bytes
291   * @param evictionThread      whether to run evictions in a bg thread or not
292   * @param mapInitialSize      initial size of backing ConcurrentHashMap
293   * @param mapLoadFactor       initial load factor of backing ConcurrentHashMap
294   * @param mapConcurrencyLevel initial concurrency factor for backing CHM
295   * @param minFactor           percentage of total size that eviction will evict until
296   * @param acceptableFactor    percentage of total size that triggers eviction
297   * @param singleFactor        percentage of total size for single-access blocks
298   * @param multiFactor         percentage of total size for multiple-access blocks
299   * @param memoryFactor        percentage of total size for in-memory blocks
300   */
301  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
302      int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
303      float minFactor, float acceptableFactor, float singleFactor,
304      float multiFactor, float memoryFactor, float hardLimitFactor,
305      boolean forceInMemory, long maxBlockSize) {
306    this.maxBlockSize = maxBlockSize;
307    if(singleFactor + multiFactor + memoryFactor != 1 ||
308        singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) {
309      throw new IllegalArgumentException("Single, multi, and memory factors " +
310          " should be non-negative and total 1.0");
311    }
312    if (minFactor >= acceptableFactor) {
313      throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
314    }
315    if (minFactor >= 1.0f || acceptableFactor >= 1.0f) {
316      throw new IllegalArgumentException("all factors must be < 1");
317    }
318    this.maxSize = maxSize;
319    this.blockSize = blockSize;
320    this.forceInMemory = forceInMemory;
321    map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel);
322    this.minFactor = minFactor;
323    this.acceptableFactor = acceptableFactor;
324    this.singleFactor = singleFactor;
325    this.multiFactor = multiFactor;
326    this.memoryFactor = memoryFactor;
327    this.stats = new CacheStats(this.getClass().getSimpleName());
328    this.count = new AtomicLong(0);
329    this.elements = new AtomicLong(0);
330    this.dataBlockElements = new LongAdder();
331    this.dataBlockSize = new LongAdder();
332    this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
333    this.size = new AtomicLong(this.overhead);
334    this.hardCapacityLimitFactor = hardLimitFactor;
335    if (evictionThread) {
336      this.evictionThread = new EvictionThread(this);
337      this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
338    } else {
339      this.evictionThread = null;
340    }
341    // TODO: Add means of turning this off.  Bit obnoxious running thread just to make a log
342    // every five minutes.
343    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD,
344                                                STAT_THREAD_PERIOD, TimeUnit.SECONDS);
345  }
346
347  @Override
348  public void setVictimCache(BlockCache victimCache) {
349    if (victimHandler != null) {
350      throw new IllegalArgumentException("The victim cache has already been set");
351    }
352    victimHandler = requireNonNull(victimCache);
353  }
354
355  @Override
356  public void setMaxSize(long maxSize) {
357    this.maxSize = maxSize;
358    if (this.size.get() > acceptableSize() && !evictionInProgress) {
359      runEviction();
360    }
361  }
362
363  /**
364   * The block cached in LRUBlockCache will always be an heap block: on the one side, the heap
365   * access will be more faster then off-heap, the small index block or meta block cached in
366   * CombinedBlockCache will benefit a lot. on other side, the LRUBlockCache size is always
367   * calculated based on the total heap size, if caching an off-heap block in LRUBlockCache, the
368   * heap size will be messed up. Here we will clone the block into an heap block if it's an
369   * off-heap block, otherwise just use the original block. The key point is maintain the refCnt of
370   * the block (HBASE-22127): <br>
371   * 1. if cache the cloned heap block, its refCnt is an totally new one, it's easy to handle; <br>
372   * 2. if cache the original heap block, we're sure that it won't be tracked in ByteBuffAllocator's
373   * reservoir, if both RPC and LRUBlockCache release the block, then it can be garbage collected by
374   * JVM, so need a retain here.
375   * @param buf the original block
376   * @return an block with an heap memory backend.
377   */
378  private Cacheable asReferencedHeapBlock(Cacheable buf) {
379    if (buf instanceof HFileBlock) {
380      HFileBlock blk = ((HFileBlock) buf);
381      if (blk.isSharedMem()) {
382        return HFileBlock.deepCloneOnHeap(blk);
383      }
384    }
385    // The block will be referenced by this LRUBlockCache, so should increase its refCnt here.
386    return buf.retain();
387  }
388
389  // BlockCache implementation
390
391  /**
392   * Cache the block with the specified name and buffer.
393   * <p>
394   * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)
395   * this can happen, for which we compare the buffer contents.
396   *
397   * @param cacheKey block's cache key
398   * @param buf      block buffer
399   * @param inMemory if block is in-memory
400   */
401  @Override
402  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
403    if (buf.heapSize() > maxBlockSize) {
404      // If there are a lot of blocks that are too
405      // big this can make the logs way too noisy.
406      // So we log 2%
407      if (stats.failInsert() % 50 == 0) {
408        LOG.warn("Trying to cache too large a block "
409            + cacheKey.getHfileName() + " @ "
410            + cacheKey.getOffset()
411            + " is " + buf.heapSize()
412            + " which is larger than " + maxBlockSize);
413      }
414      return;
415    }
416
417    LruCachedBlock cb = map.get(cacheKey);
418    if (cb != null && !BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, buf)) {
419      return;
420    }
421    long currentSize = size.get();
422    long currentAcceptableSize = acceptableSize();
423    long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize);
424    if (currentSize >= hardLimitSize) {
425      stats.failInsert();
426      if (LOG.isTraceEnabled()) {
427        LOG.trace("LruBlockCache current size " + StringUtils.byteDesc(currentSize)
428          + " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + "."
429          + " The hard limit size is " + StringUtils.byteDesc(hardLimitSize)
430          + ", failed to put cacheKey:" + cacheKey + " into LruBlockCache.");
431      }
432      if (!evictionInProgress) {
433        runEviction();
434      }
435      return;
436    }
437    // Ensure that the block is an heap one.
438    buf = asReferencedHeapBlock(buf);
439    cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
440    long newSize = updateSizeMetrics(cb, false);
441    map.put(cacheKey, cb);
442    long val = elements.incrementAndGet();
443    if (buf.getBlockType().isData()) {
444       dataBlockElements.increment();
445    }
446    if (LOG.isTraceEnabled()) {
447      long size = map.size();
448      assertCounterSanity(size, val);
449    }
450    if (newSize > currentAcceptableSize && !evictionInProgress) {
451      runEviction();
452    }
453  }
454
455  /**
456   * Sanity-checking for parity between actual block cache content and metrics.
457   * Intended only for use with TRACE level logging and -ea JVM.
458   */
459  private static void assertCounterSanity(long mapSize, long counterVal) {
460    if (counterVal < 0) {
461      LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal +
462        ", mapSize=" + mapSize);
463      return;
464    }
465    if (mapSize < Integer.MAX_VALUE) {
466      double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.);
467      if (pct_diff > 0.05) {
468        LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal +
469          ", mapSize=" + mapSize);
470      }
471    }
472  }
473
474  /**
475   * Cache the block with the specified name and buffer.
476   * <p>
477   * TODO after HBASE-22005, we may cache an block which allocated from off-heap, but our LRU cache
478   * sizing is based on heap size, so we should handle this in HBASE-22127. It will introduce an
479   * switch whether make the LRU on-heap or not, if so we may need copy the memory to on-heap,
480   * otherwise the caching size is based on off-heap.
481   * @param cacheKey block's cache key
482   * @param buf block buffer
483   */
484  @Override
485  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
486    cacheBlock(cacheKey, buf, false);
487  }
488
489  /**
490   * Helper function that updates the local size counter and also updates any
491   * per-cf or per-blocktype metrics it can discern from given
492   * {@link LruCachedBlock}
493   */
494  private long updateSizeMetrics(LruCachedBlock cb, boolean evict) {
495    long heapsize = cb.heapSize();
496    BlockType bt = cb.getBuffer().getBlockType();
497    if (evict) {
498      heapsize *= -1;
499    }
500    if (bt != null && bt.isData()) {
501       dataBlockSize.add(heapsize);
502    }
503    return size.addAndGet(heapsize);
504  }
505
506  /**
507   * Get the buffer of the block with the specified name.
508   *
509   * @param cacheKey           block's cache key
510   * @param caching            true if the caller caches blocks on cache misses
511   * @param repeat             Whether this is a repeat lookup for the same block
512   *                           (used to avoid double counting cache misses when doing double-check
513   *                           locking)
514   * @param updateCacheMetrics Whether to update cache metrics or not
515   *
516   * @return buffer of specified cache key, or null if not in cache
517   */
518  @Override
519  public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
520      boolean updateCacheMetrics) {
521    LruCachedBlock cb = map.computeIfPresent(cacheKey, (key, val) -> {
522      // It will be referenced by RPC path, so increase here. NOTICE: Must do the retain inside
523      // this block. because if retain outside the map#computeIfPresent, the evictBlock may remove
524      // the block and release, then we're retaining a block with refCnt=0 which is disallowed.
525      // see HBASE-22422.
526      val.getBuffer().retain();
527      return val;
528    });
529    if (cb == null) {
530      if (!repeat && updateCacheMetrics) {
531        stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
532      }
533      // If there is another block cache then try and read there.
534      // However if this is a retry ( second time in double checked locking )
535      // And it's already a miss then the l2 will also be a miss.
536      if (victimHandler != null && !repeat) {
537        // The handler will increase result's refCnt for RPC, so need no extra retain.
538        Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
539        // Promote this to L1.
540        if (result != null) {
541          if (caching) {
542            cacheBlock(cacheKey, result, /* inMemory = */ false);
543          }
544        }
545        return result;
546      }
547      return null;
548    }
549    if (updateCacheMetrics) {
550      stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
551    }
552    cb.access(count.incrementAndGet());
553    return cb.getBuffer();
554  }
555
556  /**
557   * Whether the cache contains block with specified cacheKey
558   *
559   * @return true if contains the block
560   */
561  @Override
562  public boolean containsBlock(BlockCacheKey cacheKey) {
563    return map.containsKey(cacheKey);
564  }
565
566  @Override
567  public boolean evictBlock(BlockCacheKey cacheKey) {
568    LruCachedBlock cb = map.get(cacheKey);
569    return cb != null && evictBlock(cb, false) > 0;
570  }
571
572  /**
573   * Evicts all blocks for a specific HFile. This is an
574   * expensive operation implemented as a linear-time search through all blocks
575   * in the cache. Ideally this should be a search in a log-access-time map.
576   *
577   * <p>
578   * This is used for evict-on-close to remove all blocks of a specific HFile.
579   *
580   * @return the number of blocks evicted
581   */
582  @Override
583  public int evictBlocksByHfileName(String hfileName) {
584    int numEvicted = 0;
585    for (BlockCacheKey key : map.keySet()) {
586      if (key.getHfileName().equals(hfileName)) {
587        if (evictBlock(key))
588          ++numEvicted;
589      }
590    }
591    if (victimHandler != null) {
592      numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
593    }
594    return numEvicted;
595  }
596
597  /**
598   * Evict the block, and it will be cached by the victim handler if exists &amp;&amp;
599   * block may be read again later
600   *
601   * @param evictedByEvictionProcess true if the given block is evicted by
602   *          EvictionThread
603   * @return the heap size of evicted block
604   */
605  protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) {
606    LruCachedBlock previous = map.remove(block.getCacheKey());
607    if (previous == null) {
608      return 0;
609    }
610    updateSizeMetrics(block, true);
611    long val = elements.decrementAndGet();
612    if (LOG.isTraceEnabled()) {
613      long size = map.size();
614      assertCounterSanity(size, val);
615    }
616    if (block.getBuffer().getBlockType().isData()) {
617      dataBlockElements.decrement();
618    }
619    if (evictedByEvictionProcess) {
620      // When the eviction of the block happened because of invalidation of HFiles, no need to
621      // update the stats counter.
622      stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary());
623      if (victimHandler != null) {
624        victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer());
625      }
626    }
627    // Decrease the block's reference count, and if refCount is 0, then it'll auto-deallocate. DO
628    // NOT move this up because if do that then the victimHandler may access the buffer with
629    // refCnt = 0 which is disallowed.
630    previous.getBuffer().release();
631    return block.heapSize();
632  }
633
634  /**
635   * Multi-threaded call to run the eviction process.
636   */
637  private void runEviction() {
638    if (evictionThread == null) {
639      evict();
640    } else {
641      evictionThread.evict();
642    }
643  }
644
645  @VisibleForTesting
646  boolean isEvictionInProgress() {
647    return evictionInProgress;
648  }
649
650  @VisibleForTesting
651  long getOverhead() {
652    return overhead;
653  }
654
655  /**
656   * Eviction method.
657   */
658  void evict() {
659
660    // Ensure only one eviction at a time
661    if(!evictionLock.tryLock()) return;
662
663    try {
664      evictionInProgress = true;
665      long currentSize = this.size.get();
666      long bytesToFree = currentSize - minSize();
667
668      if (LOG.isTraceEnabled()) {
669        LOG.trace("Block cache LRU eviction started; Attempting to free " +
670          StringUtils.byteDesc(bytesToFree) + " of total=" +
671          StringUtils.byteDesc(currentSize));
672      }
673
674      if (bytesToFree <= 0) return;
675
676      // Instantiate priority buckets
677      BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize, singleSize());
678      BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize, multiSize());
679      BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize, memorySize());
680
681      // Scan entire map putting into appropriate buckets
682      for (LruCachedBlock cachedBlock : map.values()) {
683        switch (cachedBlock.getPriority()) {
684          case SINGLE: {
685            bucketSingle.add(cachedBlock);
686            break;
687          }
688          case MULTI: {
689            bucketMulti.add(cachedBlock);
690            break;
691          }
692          case MEMORY: {
693            bucketMemory.add(cachedBlock);
694            break;
695          }
696        }
697      }
698
699      long bytesFreed = 0;
700      if (forceInMemory || memoryFactor > 0.999f) {
701        long s = bucketSingle.totalSize();
702        long m = bucketMulti.totalSize();
703        if (bytesToFree > (s + m)) {
704          // this means we need to evict blocks in memory bucket to make room,
705          // so the single and multi buckets will be emptied
706          bytesFreed = bucketSingle.free(s);
707          bytesFreed += bucketMulti.free(m);
708          if (LOG.isTraceEnabled()) {
709            LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
710              " from single and multi buckets");
711          }
712          bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
713          if (LOG.isTraceEnabled()) {
714            LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
715              " total from all three buckets ");
716          }
717        } else {
718          // this means no need to evict block in memory bucket,
719          // and we try best to make the ratio between single-bucket and
720          // multi-bucket is 1:2
721          long bytesRemain = s + m - bytesToFree;
722          if (3 * s <= bytesRemain) {
723            // single-bucket is small enough that no eviction happens for it
724            // hence all eviction goes from multi-bucket
725            bytesFreed = bucketMulti.free(bytesToFree);
726          } else if (3 * m <= 2 * bytesRemain) {
727            // multi-bucket is small enough that no eviction happens for it
728            // hence all eviction goes from single-bucket
729            bytesFreed = bucketSingle.free(bytesToFree);
730          } else {
731            // both buckets need to evict some blocks
732            bytesFreed = bucketSingle.free(s - bytesRemain / 3);
733            if (bytesFreed < bytesToFree) {
734              bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
735            }
736          }
737        }
738      } else {
739        PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3);
740
741        bucketQueue.add(bucketSingle);
742        bucketQueue.add(bucketMulti);
743        bucketQueue.add(bucketMemory);
744
745        int remainingBuckets = bucketQueue.size();
746
747        BlockBucket bucket;
748        while ((bucket = bucketQueue.poll()) != null) {
749          long overflow = bucket.overflow();
750          if (overflow > 0) {
751            long bucketBytesToFree =
752                Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets);
753            bytesFreed += bucket.free(bucketBytesToFree);
754          }
755          remainingBuckets--;
756        }
757      }
758      if (LOG.isTraceEnabled()) {
759        long single = bucketSingle.totalSize();
760        long multi = bucketMulti.totalSize();
761        long memory = bucketMemory.totalSize();
762        LOG.trace("Block cache LRU eviction completed; " +
763          "freed=" + StringUtils.byteDesc(bytesFreed) + ", " +
764          "total=" + StringUtils.byteDesc(this.size.get()) + ", " +
765          "single=" + StringUtils.byteDesc(single) + ", " +
766          "multi=" + StringUtils.byteDesc(multi) + ", " +
767          "memory=" + StringUtils.byteDesc(memory));
768      }
769    } finally {
770      stats.evict();
771      evictionInProgress = false;
772      evictionLock.unlock();
773    }
774  }
775
776  @Override
777  public String toString() {
778    return MoreObjects.toStringHelper(this)
779      .add("blockCount", getBlockCount())
780      .add("currentSize", StringUtils.byteDesc(getCurrentSize()))
781      .add("freeSize", StringUtils.byteDesc(getFreeSize()))
782      .add("maxSize", StringUtils.byteDesc(getMaxSize()))
783      .add("heapSize", StringUtils.byteDesc(heapSize()))
784      .add("minSize", StringUtils.byteDesc(minSize()))
785      .add("minFactor", minFactor)
786      .add("multiSize", StringUtils.byteDesc(multiSize()))
787      .add("multiFactor", multiFactor)
788      .add("singleSize", StringUtils.byteDesc(singleSize()))
789      .add("singleFactor", singleFactor)
790      .toString();
791  }
792
793  /**
794   * Used to group blocks into priority buckets.  There will be a BlockBucket
795   * for each priority (single, multi, memory).  Once bucketed, the eviction
796   * algorithm takes the appropriate number of elements out of each according
797   * to configuration parameters and their relatives sizes.
798   */
799  private class BlockBucket implements Comparable<BlockBucket> {
800
801    private final String name;
802    private LruCachedBlockQueue queue;
803    private long totalSize = 0;
804    private long bucketSize;
805
806    public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) {
807      this.name = name;
808      this.bucketSize = bucketSize;
809      queue = new LruCachedBlockQueue(bytesToFree, blockSize);
810      totalSize = 0;
811    }
812
813    public void add(LruCachedBlock block) {
814      totalSize += block.heapSize();
815      queue.add(block);
816    }
817
818    public long free(long toFree) {
819      if (LOG.isTraceEnabled()) {
820        LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this);
821      }
822      LruCachedBlock cb;
823      long freedBytes = 0;
824      while ((cb = queue.pollLast()) != null) {
825        freedBytes += evictBlock(cb, true);
826        if (freedBytes >= toFree) {
827          return freedBytes;
828        }
829      }
830      if (LOG.isTraceEnabled()) {
831        LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this);
832      }
833      return freedBytes;
834    }
835
836    public long overflow() {
837      return totalSize - bucketSize;
838    }
839
840    public long totalSize() {
841      return totalSize;
842    }
843
844    @Override
845    public int compareTo(BlockBucket that) {
846      return Long.compare(this.overflow(), that.overflow());
847    }
848
849    @Override
850    public boolean equals(Object that) {
851      if (that == null || !(that instanceof BlockBucket)) {
852        return false;
853      }
854      return compareTo((BlockBucket)that) == 0;
855    }
856
857    @Override
858    public int hashCode() {
859      return Objects.hashCode(name, bucketSize, queue, totalSize);
860    }
861
862    @Override
863    public String toString() {
864      return MoreObjects.toStringHelper(this)
865        .add("name", name)
866        .add("totalSize", StringUtils.byteDesc(totalSize))
867        .add("bucketSize", StringUtils.byteDesc(bucketSize))
868        .toString();
869    }
870  }
871
872  /**
873   * Get the maximum size of this cache.
874   *
875   * @return max size in bytes
876   */
877
878  @Override
879  public long getMaxSize() {
880    return this.maxSize;
881  }
882
883  @Override
884  public long getCurrentSize() {
885    return this.size.get();
886  }
887
888  @Override
889  public long getCurrentDataSize() {
890    return this.dataBlockSize.sum();
891  }
892
893  @Override
894  public long getFreeSize() {
895    return getMaxSize() - getCurrentSize();
896  }
897
898  @Override
899  public long size() {
900    return getMaxSize();
901  }
902
903  @Override
904  public long getBlockCount() {
905    return this.elements.get();
906  }
907
908  @Override
909  public long getDataBlockCount() {
910    return this.dataBlockElements.sum();
911  }
912
913  EvictionThread getEvictionThread() {
914    return this.evictionThread;
915  }
916
917  /*
918   * Eviction thread.  Sits in waiting state until an eviction is triggered
919   * when the cache size grows above the acceptable level.<p>
920   *
921   * Thread is triggered into action by {@link LruBlockCache#runEviction()}
922   */
923  static class EvictionThread extends HasThread {
924
925    private WeakReference<LruBlockCache> cache;
926    private volatile boolean go = true;
927    // flag set after enter the run method, used for test
928    private boolean enteringRun = false;
929
930    public EvictionThread(LruBlockCache cache) {
931      super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread");
932      setDaemon(true);
933      this.cache = new WeakReference<>(cache);
934    }
935
936    @Override
937    public void run() {
938      enteringRun = true;
939      while (this.go) {
940        synchronized (this) {
941          try {
942            this.wait(1000 * 10/*Don't wait for ever*/);
943          } catch (InterruptedException e) {
944            LOG.warn("Interrupted eviction thread ", e);
945            Thread.currentThread().interrupt();
946          }
947        }
948        LruBlockCache cache = this.cache.get();
949        if (cache == null) break;
950        cache.evict();
951      }
952    }
953
954    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
955        justification="This is what we want")
956    public void evict() {
957      synchronized (this) {
958        this.notifyAll();
959      }
960    }
961
962    synchronized void shutdown() {
963      this.go = false;
964      this.notifyAll();
965    }
966
967    /**
968     * Used for the test.
969     */
970    boolean isEnteringRun() {
971      return this.enteringRun;
972    }
973  }
974
975  /*
976   * Statistics thread.  Periodically prints the cache statistics to the log.
977   */
978  static class StatisticsThread extends Thread {
979
980    private final LruBlockCache lru;
981
982    public StatisticsThread(LruBlockCache lru) {
983      super("LruBlockCacheStats");
984      setDaemon(true);
985      this.lru = lru;
986    }
987
988    @Override
989    public void run() {
990      lru.logStats();
991    }
992  }
993
994  public void logStats() {
995    // Log size
996    long totalSize = heapSize();
997    long freeSize = maxSize - totalSize;
998    LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
999        "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
1000        "max=" + StringUtils.byteDesc(this.maxSize) + ", " +
1001        "blockCount=" + getBlockCount() + ", " +
1002        "accesses=" + stats.getRequestCount() + ", " +
1003        "hits=" + stats.getHitCount() + ", " +
1004        "hitRatio=" + (stats.getHitCount() == 0 ?
1005          "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
1006        "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
1007        "cachingHits=" + stats.getHitCachingCount() + ", " +
1008        "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ?
1009          "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
1010        "evictions=" + stats.getEvictionCount() + ", " +
1011        "evicted=" + stats.getEvictedCount() + ", " +
1012        "evictedPerRun=" + stats.evictedPerEviction());
1013  }
1014
1015  /**
1016   * Get counter statistics for this cache.
1017   *
1018   * <p>Includes: total accesses, hits, misses, evicted blocks, and runs
1019   * of the eviction processes.
1020   */
1021  @Override
1022  public CacheStats getStats() {
1023    return this.stats;
1024  }
1025
1026  public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
1027      (4 * Bytes.SIZEOF_LONG) + (11 * ClassSize.REFERENCE) +
1028      (6 * Bytes.SIZEOF_FLOAT) + (2 * Bytes.SIZEOF_BOOLEAN)
1029      + ClassSize.OBJECT);
1030
1031  @Override
1032  public long heapSize() {
1033    return getCurrentSize();
1034  }
1035
1036  private static long calculateOverhead(long maxSize, long blockSize, int concurrency) {
1037    // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG
1038    return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP
1039           + ((long) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY)
1040           + ((long) concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
1041  }
1042
1043  @Override
1044  public Iterator<CachedBlock> iterator() {
1045    final Iterator<LruCachedBlock> iterator = map.values().iterator();
1046
1047    return new Iterator<CachedBlock>() {
1048      private final long now = System.nanoTime();
1049
1050      @Override
1051      public boolean hasNext() {
1052        return iterator.hasNext();
1053      }
1054
1055      @Override
1056      public CachedBlock next() {
1057        final LruCachedBlock b = iterator.next();
1058        return new CachedBlock() {
1059          @Override
1060          public String toString() {
1061            return BlockCacheUtil.toString(this, now);
1062          }
1063
1064          @Override
1065          public BlockPriority getBlockPriority() {
1066            return b.getPriority();
1067          }
1068
1069          @Override
1070          public BlockType getBlockType() {
1071            return b.getBuffer().getBlockType();
1072          }
1073
1074          @Override
1075          public long getOffset() {
1076            return b.getCacheKey().getOffset();
1077          }
1078
1079          @Override
1080          public long getSize() {
1081            return b.getBuffer().heapSize();
1082          }
1083
1084          @Override
1085          public long getCachedTime() {
1086            return b.getCachedTime();
1087          }
1088
1089          @Override
1090          public String getFilename() {
1091            return b.getCacheKey().getHfileName();
1092          }
1093
1094          @Override
1095          public int compareTo(CachedBlock other) {
1096            int diff = this.getFilename().compareTo(other.getFilename());
1097            if (diff != 0) return diff;
1098            diff = Long.compare(this.getOffset(), other.getOffset());
1099            if (diff != 0) return diff;
1100            if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
1101              throw new IllegalStateException(this.getCachedTime() + ", " + other.getCachedTime());
1102            }
1103            return Long.compare(other.getCachedTime(), this.getCachedTime());
1104          }
1105
1106          @Override
1107          public int hashCode() {
1108            return b.hashCode();
1109          }
1110
1111          @Override
1112          public boolean equals(Object obj) {
1113            if (obj instanceof CachedBlock) {
1114              CachedBlock cb = (CachedBlock)obj;
1115              return compareTo(cb) == 0;
1116            } else {
1117              return false;
1118            }
1119          }
1120        };
1121      }
1122
1123      @Override
1124      public void remove() {
1125        throw new UnsupportedOperationException();
1126      }
1127    };
1128  }
1129
1130  // Simple calculators of sizes given factors and maxSize
1131
1132  long acceptableSize() {
1133    return (long)Math.floor(this.maxSize * this.acceptableFactor);
1134  }
1135  private long minSize() {
1136    return (long)Math.floor(this.maxSize * this.minFactor);
1137  }
1138  private long singleSize() {
1139    return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor);
1140  }
1141  private long multiSize() {
1142    return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor);
1143  }
1144  private long memorySize() {
1145    return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
1146  }
1147
1148  @Override
1149  public void shutdown() {
1150    if (victimHandler != null) {
1151      victimHandler.shutdown();
1152    }
1153    this.scheduleThreadPool.shutdown();
1154    for (int i = 0; i < 10; i++) {
1155      if (!this.scheduleThreadPool.isShutdown()) {
1156        try {
1157          Thread.sleep(10);
1158        } catch (InterruptedException e) {
1159          LOG.warn("Interrupted while sleeping");
1160          Thread.currentThread().interrupt();
1161          break;
1162        }
1163      }
1164    }
1165
1166    if (!this.scheduleThreadPool.isShutdown()) {
1167      List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
1168      LOG.debug("Still running " + runnables);
1169    }
1170    this.evictionThread.shutdown();
1171  }
1172
1173  /** Clears the cache. Used in tests. */
1174  @VisibleForTesting
1175  public void clearCache() {
1176    this.map.clear();
1177    this.elements.set(0);
1178  }
1179
1180  /**
1181   * Used in testing. May be very inefficient.
1182   *
1183   * @return the set of cached file names
1184   */
1185  @VisibleForTesting
1186  SortedSet<String> getCachedFileNamesForTest() {
1187    SortedSet<String> fileNames = new TreeSet<>();
1188    for (BlockCacheKey cacheKey : map.keySet()) {
1189      fileNames.add(cacheKey.getHfileName());
1190    }
1191    return fileNames;
1192  }
1193
1194  @VisibleForTesting
1195  public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
1196    Map<DataBlockEncoding, Integer> counts = new EnumMap<>(DataBlockEncoding.class);
1197    for (LruCachedBlock block : map.values()) {
1198      DataBlockEncoding encoding = ((HFileBlock) block.getBuffer()).getDataBlockEncoding();
1199      Integer count = counts.get(encoding);
1200      counts.put(encoding, (count == null ? 0 : count) + 1);
1201    }
1202    return counts;
1203  }
1204
1205  @VisibleForTesting
1206  Map<BlockCacheKey, LruCachedBlock> getMapForTests() {
1207    return map;
1208  }
1209
1210  @Override
1211  public BlockCache[] getBlockCaches() {
1212    if (victimHandler != null) {
1213      return new BlockCache[] { this, this.victimHandler };
1214    }
1215    return null;
1216  }
1217}