001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static java.util.Objects.requireNonNull;
021
022import java.lang.ref.WeakReference;
023import java.util.EnumMap;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027import java.util.PriorityQueue;
028import java.util.SortedSet;
029import java.util.TreeSet;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.Executors;
032import java.util.concurrent.ScheduledExecutorService;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicLong;
035import java.util.concurrent.atomic.LongAdder;
036import java.util.concurrent.locks.ReentrantLock;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.io.HeapSize;
039import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
040import org.apache.hadoop.hbase.util.ClassSize;
041import org.apache.hadoop.util.StringUtils;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
047import org.apache.hbase.thirdparty.com.google.common.base.Objects;
048import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
049
050/**
051 * A block cache implementation that is memory-aware using {@link HeapSize}, memory-bound using an
052 * LRU eviction algorithm, and concurrent: backed by a {@link ConcurrentHashMap} and with a
053 * non-blocking eviction thread giving constant-time {@link #cacheBlock} and {@link #getBlock}
054 * operations.
055 * </p>
056 * Contains three levels of block priority to allow for scan-resistance and in-memory families
057 * {@link org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder#setInMemory(boolean)} (An
058 * in-memory column family is a column family that should be served from memory if possible):
059 * single-access, multiple-accesses, and in-memory priority. A block is added with an in-memory
060 * priority flag if {@link org.apache.hadoop.hbase.client.ColumnFamilyDescriptor#isInMemory()},
061 * otherwise a block becomes a single access priority the first time it is read into this block
062 * cache. If a block is accessed again while in cache, it is marked as a multiple access priority
063 * block. This delineation of blocks is used to prevent scans from thrashing the cache adding a
064 * least-frequently-used element to the eviction algorithm.
065 * <p/>
066 * Each priority is given its own chunk of the total cache to ensure fairness during eviction. Each
067 * priority will retain close to its maximum size, however, if any priority is not using its entire
068 * chunk the others are able to grow beyond their chunk size.
069 * <p/>
070 * Instantiated at a minimum with the total size and average block size. All sizes are in bytes. The
071 * block size is not especially important as this cache is fully dynamic in its sizing of blocks. It
072 * is only used for pre-allocating data structures and in initial heap estimation of the map.
073 * <p/>
074 * The detailed constructor defines the sizes for the three priorities (they should total to the
075 * <code>maximum size</code> defined). It also sets the levels that trigger and control the eviction
076 * thread.
077 * <p/>
078 * The <code>acceptable size</code> is the cache size level which triggers the eviction process to
079 * start. It evicts enough blocks to get the size below the minimum size specified.
080 * <p/>
081 * Eviction happens in a separate thread and involves a single full-scan of the map. It determines
082 * how many bytes must be freed to reach the minimum size, and then while scanning determines the
083 * fewest least-recently-used blocks necessary from each of the three priorities (would be 3 times
084 * bytes to free). It then uses the priority chunk sizes to evict fairly according to the relative
085 * sizes and usage.
086 */
087@InterfaceAudience.Private
088public class LruBlockCache implements FirstLevelBlockCache {
089
090  private static final Logger LOG = LoggerFactory.getLogger(LruBlockCache.class);
091
092  /**
093   * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep
094   * evicting during an eviction run till the cache size is down to 80% of the total.
095   */
096  private static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";
097
098  /**
099   * Acceptable size of cache (no evictions if size < acceptable)
100   */
101  private static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME =
102    "hbase.lru.blockcache.acceptable.factor";
103
104  /**
105   * Hard capacity limit of cache, will reject any put if size > this * acceptable
106   */
107  static final String LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME =
108    "hbase.lru.blockcache.hard.capacity.limit.factor";
109  private static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME =
110    "hbase.lru.blockcache.single.percentage";
111  private static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME =
112    "hbase.lru.blockcache.multi.percentage";
113  private static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME =
114    "hbase.lru.blockcache.memory.percentage";
115
116  /**
117   * Configuration key to force data-block always (except in-memory are too much) cached in memory
118   * for in-memory hfile, unlike inMemory, which is a column-family configuration, inMemoryForceMode
119   * is a cluster-wide configuration
120   */
121  private static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME =
122    "hbase.lru.rs.inmemoryforcemode";
123
124  /* Default Configuration Parameters */
125
126  /* Backing Concurrent Map Configuration */
127  static final float DEFAULT_LOAD_FACTOR = 0.75f;
128  static final int DEFAULT_CONCURRENCY_LEVEL = 16;
129
130  /* Eviction thresholds */
131  private static final float DEFAULT_MIN_FACTOR = 0.95f;
132  static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f;
133
134  /* Priority buckets */
135  private static final float DEFAULT_SINGLE_FACTOR = 0.25f;
136  private static final float DEFAULT_MULTI_FACTOR = 0.50f;
137  private static final float DEFAULT_MEMORY_FACTOR = 0.25f;
138
139  private static final float DEFAULT_HARD_CAPACITY_LIMIT_FACTOR = 1.2f;
140
141  private static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false;
142
143  /* Statistics thread */
144  private static final int STAT_THREAD_PERIOD = 60 * 5;
145  private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size";
146  private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
147
148  /**
149   * Defined the cache map as {@link ConcurrentHashMap} here, because in
150   * {@link LruBlockCache#getBlock}, we need to guarantee the atomicity of map#computeIfPresent
151   * (key, func). Besides, the func method must execute exactly once only when the key is present
152   * and under the lock context, otherwise the reference count will be messed up. Notice that the
153   * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that.
154   */
155  private transient final ConcurrentHashMap<BlockCacheKey, LruCachedBlock> map;
156
157  /** Eviction lock (locked when eviction in process) */
158  private transient final ReentrantLock evictionLock = new ReentrantLock(true);
159
160  private final long maxBlockSize;
161
162  /** Volatile boolean to track if we are in an eviction process or not */
163  private volatile boolean evictionInProgress = false;
164
165  /** Eviction thread */
166  private transient final EvictionThread evictionThread;
167
168  /** Statistics thread schedule pool (for heavy debugging, could remove) */
169  private transient final ScheduledExecutorService scheduleThreadPool =
170    Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
171      .setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build());
172
173  /** Current size of cache */
174  private final AtomicLong size;
175
176  /** Current size of data blocks */
177  private final LongAdder dataBlockSize;
178
179  /** Current number of cached elements */
180  private final AtomicLong elements;
181
182  /** Current number of cached data block elements */
183  private final LongAdder dataBlockElements;
184
185  /** Cache access count (sequential ID) */
186  private final AtomicLong count;
187
188  /** hard capacity limit */
189  private float hardCapacityLimitFactor;
190
191  /** Cache statistics */
192  private final CacheStats stats;
193
194  /** Maximum allowable size of cache (block put if size > max, evict) */
195  private long maxSize;
196
197  /** Approximate block size */
198  private long blockSize;
199
200  /** Acceptable size of cache (no evictions if size < acceptable) */
201  private float acceptableFactor;
202
203  /** Minimum threshold of cache (when evicting, evict until size < min) */
204  private float minFactor;
205
206  /** Single access bucket size */
207  private float singleFactor;
208
209  /** Multiple access bucket size */
210  private float multiFactor;
211
212  /** In-memory bucket size */
213  private float memoryFactor;
214
215  /** Overhead of the structure itself */
216  private long overhead;
217
218  /** Whether in-memory hfile's data block has higher priority when evicting */
219  private boolean forceInMemory;
220
221  /**
222   * Where to send victims (blocks evicted/missing from the cache). This is used only when we use an
223   * external cache as L2. Note: See org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache
224   */
225  private transient BlockCache victimHandler = null;
226
227  /**
228   * Default constructor. Specify maximum size and expected average block size (approximation is
229   * fine).
230   * <p>
231   * All other factors will be calculated based on defaults specified in this class.
232   * @param maxSize   maximum size of cache, in bytes
233   * @param blockSize approximate size of each block, in bytes
234   */
235  public LruBlockCache(long maxSize, long blockSize) {
236    this(maxSize, blockSize, true);
237  }
238
239  /**
240   * Constructor used for testing. Allows disabling of the eviction thread.
241   */
242  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
243    this(maxSize, blockSize, evictionThread, (int) Math.ceil(1.2 * maxSize / blockSize),
244      DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL, DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
245      DEFAULT_SINGLE_FACTOR, DEFAULT_MULTI_FACTOR, DEFAULT_MEMORY_FACTOR,
246      DEFAULT_HARD_CAPACITY_LIMIT_FACTOR, false, DEFAULT_MAX_BLOCK_SIZE);
247  }
248
249  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
250    this(maxSize, blockSize, evictionThread, (int) Math.ceil(1.2 * maxSize / blockSize),
251      DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
252      conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
253      conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR),
254      conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR),
255      conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR),
256      conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR),
257      conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, DEFAULT_HARD_CAPACITY_LIMIT_FACTOR),
258      conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE),
259      conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE));
260  }
261
262  public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
263    this(maxSize, blockSize, true, conf);
264  }
265
266  /**
267   * Configurable constructor. Use this constructor if not using defaults.
268   * @param maxSize             maximum size of this cache, in bytes
269   * @param blockSize           expected average size of blocks, in bytes
270   * @param evictionThread      whether to run evictions in a bg thread or not
271   * @param mapInitialSize      initial size of backing ConcurrentHashMap
272   * @param mapLoadFactor       initial load factor of backing ConcurrentHashMap
273   * @param mapConcurrencyLevel initial concurrency factor for backing CHM
274   * @param minFactor           percentage of total size that eviction will evict until
275   * @param acceptableFactor    percentage of total size that triggers eviction
276   * @param singleFactor        percentage of total size for single-access blocks
277   * @param multiFactor         percentage of total size for multiple-access blocks
278   * @param memoryFactor        percentage of total size for in-memory blocks
279   */
280  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, int mapInitialSize,
281    float mapLoadFactor, int mapConcurrencyLevel, float minFactor, float acceptableFactor,
282    float singleFactor, float multiFactor, float memoryFactor, float hardLimitFactor,
283    boolean forceInMemory, long maxBlockSize) {
284    this.maxBlockSize = maxBlockSize;
285    if (
286      singleFactor + multiFactor + memoryFactor != 1 || singleFactor < 0 || multiFactor < 0
287        || memoryFactor < 0
288    ) {
289      throw new IllegalArgumentException(
290        "Single, multi, and memory factors " + " should be non-negative and total 1.0");
291    }
292    if (minFactor >= acceptableFactor) {
293      throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
294    }
295    if (minFactor >= 1.0f || acceptableFactor >= 1.0f) {
296      throw new IllegalArgumentException("all factors must be < 1");
297    }
298    this.maxSize = maxSize;
299    this.blockSize = blockSize;
300    this.forceInMemory = forceInMemory;
301    map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel);
302    this.minFactor = minFactor;
303    this.acceptableFactor = acceptableFactor;
304    this.singleFactor = singleFactor;
305    this.multiFactor = multiFactor;
306    this.memoryFactor = memoryFactor;
307    this.stats = new CacheStats(this.getClass().getSimpleName());
308    this.count = new AtomicLong(0);
309    this.elements = new AtomicLong(0);
310    this.dataBlockElements = new LongAdder();
311    this.dataBlockSize = new LongAdder();
312    this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
313    this.size = new AtomicLong(this.overhead);
314    this.hardCapacityLimitFactor = hardLimitFactor;
315    if (evictionThread) {
316      this.evictionThread = new EvictionThread(this);
317      this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
318    } else {
319      this.evictionThread = null;
320    }
321    // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log
322    // every five minutes.
323    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD,
324      STAT_THREAD_PERIOD, TimeUnit.SECONDS);
325  }
326
327  @Override
328  public void setVictimCache(BlockCache victimCache) {
329    if (victimHandler != null) {
330      throw new IllegalArgumentException("The victim cache has already been set");
331    }
332    victimHandler = requireNonNull(victimCache);
333  }
334
335  @Override
336  public void setMaxSize(long maxSize) {
337    this.maxSize = maxSize;
338    if (this.size.get() > acceptableSize() && !evictionInProgress) {
339      runEviction();
340    }
341  }
342
343  /**
344   * The block cached in LRUBlockCache will always be an heap block: on the one side, the heap
345   * access will be more faster then off-heap, the small index block or meta block cached in
346   * CombinedBlockCache will benefit a lot. on other side, the LRUBlockCache size is always
347   * calculated based on the total heap size, if caching an off-heap block in LRUBlockCache, the
348   * heap size will be messed up. Here we will clone the block into an heap block if it's an
349   * off-heap block, otherwise just use the original block. The key point is maintain the refCnt of
350   * the block (HBASE-22127): <br>
351   * 1. if cache the cloned heap block, its refCnt is an totally new one, it's easy to handle; <br>
352   * 2. if cache the original heap block, we're sure that it won't be tracked in ByteBuffAllocator's
353   * reservoir, if both RPC and LRUBlockCache release the block, then it can be garbage collected by
354   * JVM, so need a retain here.
355   * @param buf the original block
356   * @return an block with an heap memory backend.
357   */
358  private Cacheable asReferencedHeapBlock(Cacheable buf) {
359    if (buf instanceof HFileBlock) {
360      HFileBlock blk = ((HFileBlock) buf);
361      if (blk.isSharedMem()) {
362        return HFileBlock.deepCloneOnHeap(blk);
363      }
364    }
365    // The block will be referenced by this LRUBlockCache, so should increase its refCnt here.
366    return buf.retain();
367  }
368
369  // BlockCache implementation
370
371  /**
372   * Cache the block with the specified name and buffer.
373   * <p>
374   * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)
375   * this can happen, for which we compare the buffer contents.
376   * @param cacheKey block's cache key
377   * @param buf      block buffer
378   * @param inMemory if block is in-memory
379   */
380  @Override
381  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
382    if (buf.heapSize() > maxBlockSize) {
383      // If there are a lot of blocks that are too
384      // big this can make the logs way too noisy.
385      // So we log 2%
386      if (stats.failInsert() % 50 == 0) {
387        LOG.warn("Trying to cache too large a block " + cacheKey.getHfileName() + " @ "
388          + cacheKey.getOffset() + " is " + buf.heapSize() + " which is larger than "
389          + maxBlockSize);
390      }
391      return;
392    }
393
394    LruCachedBlock cb = map.get(cacheKey);
395    if (cb != null && !BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, buf)) {
396      return;
397    }
398    long currentSize = size.get();
399    long currentAcceptableSize = acceptableSize();
400    long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize);
401    if (currentSize >= hardLimitSize) {
402      stats.failInsert();
403      if (LOG.isTraceEnabled()) {
404        LOG.trace("LruBlockCache current size " + StringUtils.byteDesc(currentSize)
405          + " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + "."
406          + " The hard limit size is " + StringUtils.byteDesc(hardLimitSize)
407          + ", failed to put cacheKey:" + cacheKey + " into LruBlockCache.");
408      }
409      if (!evictionInProgress) {
410        runEviction();
411      }
412      return;
413    }
414    // Ensure that the block is an heap one.
415    buf = asReferencedHeapBlock(buf);
416    cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
417    long newSize = updateSizeMetrics(cb, false);
418    map.put(cacheKey, cb);
419    long val = elements.incrementAndGet();
420    if (buf.getBlockType().isData()) {
421      dataBlockElements.increment();
422    }
423    if (LOG.isTraceEnabled()) {
424      long size = map.size();
425      assertCounterSanity(size, val);
426    }
427    if (newSize > currentAcceptableSize && !evictionInProgress) {
428      runEviction();
429    }
430  }
431
432  /**
433   * Sanity-checking for parity between actual block cache content and metrics. Intended only for
434   * use with TRACE level logging and -ea JVM.
435   */
436  private static void assertCounterSanity(long mapSize, long counterVal) {
437    if (counterVal < 0) {
438      LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal
439        + ", mapSize=" + mapSize);
440      return;
441    }
442    if (mapSize < Integer.MAX_VALUE) {
443      double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.);
444      if (pct_diff > 0.05) {
445        LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal
446          + ", mapSize=" + mapSize);
447      }
448    }
449  }
450
451  /**
452   * Cache the block with the specified name and buffer.
453   * <p>
454   * TODO after HBASE-22005, we may cache an block which allocated from off-heap, but our LRU cache
455   * sizing is based on heap size, so we should handle this in HBASE-22127. It will introduce an
456   * switch whether make the LRU on-heap or not, if so we may need copy the memory to on-heap,
457   * otherwise the caching size is based on off-heap.
458   * @param cacheKey block's cache key
459   * @param buf      block buffer
460   */
461  @Override
462  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
463    cacheBlock(cacheKey, buf, false);
464  }
465
466  /**
467   * Helper function that updates the local size counter and also updates any per-cf or
468   * per-blocktype metrics it can discern from given {@link LruCachedBlock}
469   */
470  private long updateSizeMetrics(LruCachedBlock cb, boolean evict) {
471    long heapsize = cb.heapSize();
472    BlockType bt = cb.getBuffer().getBlockType();
473    if (evict) {
474      heapsize *= -1;
475    }
476    if (bt != null && bt.isData()) {
477      dataBlockSize.add(heapsize);
478    }
479    return size.addAndGet(heapsize);
480  }
481
482  /**
483   * Get the buffer of the block with the specified name.
484   * @param cacheKey           block's cache key
485   * @param caching            true if the caller caches blocks on cache misses
486   * @param repeat             Whether this is a repeat lookup for the same block (used to avoid
487   *                           double counting cache misses when doing double-check locking)
488   * @param updateCacheMetrics Whether to update cache metrics or not
489   * @return buffer of specified cache key, or null if not in cache
490   */
491  @Override
492  public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
493    boolean updateCacheMetrics) {
494    LruCachedBlock cb = map.computeIfPresent(cacheKey, (key, val) -> {
495      // It will be referenced by RPC path, so increase here. NOTICE: Must do the retain inside
496      // this block. because if retain outside the map#computeIfPresent, the evictBlock may remove
497      // the block and release, then we're retaining a block with refCnt=0 which is disallowed.
498      // see HBASE-22422.
499      val.getBuffer().retain();
500      return val;
501    });
502    if (cb == null) {
503      if (!repeat && updateCacheMetrics) {
504        stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
505      }
506      // If there is another block cache then try and read there.
507      // However if this is a retry ( second time in double checked locking )
508      // And it's already a miss then the l2 will also be a miss.
509      if (victimHandler != null && !repeat) {
510        // The handler will increase result's refCnt for RPC, so need no extra retain.
511        Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
512        // Promote this to L1.
513        if (result != null) {
514          if (caching) {
515            cacheBlock(cacheKey, result, /* inMemory = */ false);
516          }
517        }
518        return result;
519      }
520      return null;
521    }
522    if (updateCacheMetrics) {
523      stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
524    }
525    cb.access(count.incrementAndGet());
526    return cb.getBuffer();
527  }
528
529  /**
530   * Whether the cache contains block with specified cacheKey
531   * @return true if contains the block
532   */
533  @Override
534  public boolean containsBlock(BlockCacheKey cacheKey) {
535    return map.containsKey(cacheKey);
536  }
537
538  @Override
539  public boolean evictBlock(BlockCacheKey cacheKey) {
540    LruCachedBlock cb = map.get(cacheKey);
541    return cb != null && evictBlock(cb, false) > 0;
542  }
543
544  /**
545   * Evicts all blocks for a specific HFile. This is an expensive operation implemented as a
546   * linear-time search through all blocks in the cache. Ideally this should be a search in a
547   * log-access-time map.
548   * <p>
549   * This is used for evict-on-close to remove all blocks of a specific HFile.
550   * @return the number of blocks evicted
551   */
552  @Override
553  public int evictBlocksByHfileName(String hfileName) {
554    int numEvicted = 0;
555    for (BlockCacheKey key : map.keySet()) {
556      if (key.getHfileName().equals(hfileName)) {
557        if (evictBlock(key)) {
558          ++numEvicted;
559        }
560      }
561    }
562    if (victimHandler != null) {
563      numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
564    }
565    return numEvicted;
566  }
567
568  /**
569   * Evict the block, and it will be cached by the victim handler if exists &amp;&amp; block may be
570   * read again later
571   * @param evictedByEvictionProcess true if the given block is evicted by EvictionThread
572   * @return the heap size of evicted block
573   */
574  protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) {
575    LruCachedBlock previous = map.remove(block.getCacheKey());
576    if (previous == null) {
577      return 0;
578    }
579    updateSizeMetrics(block, true);
580    long val = elements.decrementAndGet();
581    if (LOG.isTraceEnabled()) {
582      long size = map.size();
583      assertCounterSanity(size, val);
584    }
585    if (block.getBuffer().getBlockType().isData()) {
586      dataBlockElements.decrement();
587    }
588    if (evictedByEvictionProcess) {
589      // When the eviction of the block happened because of invalidation of HFiles, no need to
590      // update the stats counter.
591      stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary());
592      if (victimHandler != null) {
593        victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer());
594      }
595    }
596    // Decrease the block's reference count, and if refCount is 0, then it'll auto-deallocate. DO
597    // NOT move this up because if do that then the victimHandler may access the buffer with
598    // refCnt = 0 which is disallowed.
599    previous.getBuffer().release();
600    return block.heapSize();
601  }
602
603  /**
604   * Multi-threaded call to run the eviction process.
605   */
606  private void runEviction() {
607    if (evictionThread == null || !evictionThread.isGo()) {
608      evict();
609    } else {
610      evictionThread.evict();
611    }
612  }
613
614  boolean isEvictionInProgress() {
615    return evictionInProgress;
616  }
617
618  long getOverhead() {
619    return overhead;
620  }
621
622  /**
623   * Eviction method.
624   */
625  void evict() {
626
627    // Ensure only one eviction at a time
628    if (!evictionLock.tryLock()) {
629      return;
630    }
631
632    try {
633      evictionInProgress = true;
634      long currentSize = this.size.get();
635      long bytesToFree = currentSize - minSize();
636
637      if (LOG.isTraceEnabled()) {
638        LOG.trace("Block cache LRU eviction started; Attempting to free "
639          + StringUtils.byteDesc(bytesToFree) + " of total=" + StringUtils.byteDesc(currentSize));
640      }
641
642      if (bytesToFree <= 0) {
643        return;
644      }
645
646      // Instantiate priority buckets
647      BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize, singleSize());
648      BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize, multiSize());
649      BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize, memorySize());
650
651      // Scan entire map putting into appropriate buckets
652      for (LruCachedBlock cachedBlock : map.values()) {
653        switch (cachedBlock.getPriority()) {
654          case SINGLE: {
655            bucketSingle.add(cachedBlock);
656            break;
657          }
658          case MULTI: {
659            bucketMulti.add(cachedBlock);
660            break;
661          }
662          case MEMORY: {
663            bucketMemory.add(cachedBlock);
664            break;
665          }
666        }
667      }
668
669      long bytesFreed = 0;
670      if (forceInMemory || memoryFactor > 0.999f) {
671        long s = bucketSingle.totalSize();
672        long m = bucketMulti.totalSize();
673        if (bytesToFree > (s + m)) {
674          // this means we need to evict blocks in memory bucket to make room,
675          // so the single and multi buckets will be emptied
676          bytesFreed = bucketSingle.free(s);
677          bytesFreed += bucketMulti.free(m);
678          if (LOG.isTraceEnabled()) {
679            LOG.trace(
680              "freed " + StringUtils.byteDesc(bytesFreed) + " from single and multi buckets");
681          }
682          bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
683          if (LOG.isTraceEnabled()) {
684            LOG.trace(
685              "freed " + StringUtils.byteDesc(bytesFreed) + " total from all three buckets ");
686          }
687        } else {
688          // this means no need to evict block in memory bucket,
689          // and we try best to make the ratio between single-bucket and
690          // multi-bucket is 1:2
691          long bytesRemain = s + m - bytesToFree;
692          if (3 * s <= bytesRemain) {
693            // single-bucket is small enough that no eviction happens for it
694            // hence all eviction goes from multi-bucket
695            bytesFreed = bucketMulti.free(bytesToFree);
696          } else if (3 * m <= 2 * bytesRemain) {
697            // multi-bucket is small enough that no eviction happens for it
698            // hence all eviction goes from single-bucket
699            bytesFreed = bucketSingle.free(bytesToFree);
700          } else {
701            // both buckets need to evict some blocks
702            bytesFreed = bucketSingle.free(s - bytesRemain / 3);
703            if (bytesFreed < bytesToFree) {
704              bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
705            }
706          }
707        }
708      } else {
709        PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3);
710
711        bucketQueue.add(bucketSingle);
712        bucketQueue.add(bucketMulti);
713        bucketQueue.add(bucketMemory);
714
715        int remainingBuckets = bucketQueue.size();
716
717        BlockBucket bucket;
718        while ((bucket = bucketQueue.poll()) != null) {
719          long overflow = bucket.overflow();
720          if (overflow > 0) {
721            long bucketBytesToFree =
722              Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets);
723            bytesFreed += bucket.free(bucketBytesToFree);
724          }
725          remainingBuckets--;
726        }
727      }
728      if (LOG.isTraceEnabled()) {
729        long single = bucketSingle.totalSize();
730        long multi = bucketMulti.totalSize();
731        long memory = bucketMemory.totalSize();
732        LOG.trace(
733          "Block cache LRU eviction completed; " + "freed=" + StringUtils.byteDesc(bytesFreed)
734            + ", " + "total=" + StringUtils.byteDesc(this.size.get()) + ", " + "single="
735            + StringUtils.byteDesc(single) + ", " + "multi=" + StringUtils.byteDesc(multi) + ", "
736            + "memory=" + StringUtils.byteDesc(memory));
737      }
738    } finally {
739      stats.evict();
740      evictionInProgress = false;
741      evictionLock.unlock();
742    }
743  }
744
745  @Override
746  public String toString() {
747    return MoreObjects.toStringHelper(this).add("blockCount", getBlockCount())
748      .add("currentSize", StringUtils.byteDesc(getCurrentSize()))
749      .add("freeSize", StringUtils.byteDesc(getFreeSize()))
750      .add("maxSize", StringUtils.byteDesc(getMaxSize()))
751      .add("heapSize", StringUtils.byteDesc(heapSize()))
752      .add("minSize", StringUtils.byteDesc(minSize())).add("minFactor", minFactor)
753      .add("multiSize", StringUtils.byteDesc(multiSize())).add("multiFactor", multiFactor)
754      .add("singleSize", StringUtils.byteDesc(singleSize())).add("singleFactor", singleFactor)
755      .toString();
756  }
757
758  /**
759   * Used to group blocks into priority buckets. There will be a BlockBucket for each priority
760   * (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate number of
761   * elements out of each according to configuration parameters and their relatives sizes.
762   */
763  private class BlockBucket implements Comparable<BlockBucket> {
764
765    private final String name;
766    private LruCachedBlockQueue queue;
767    private long totalSize = 0;
768    private long bucketSize;
769
770    public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) {
771      this.name = name;
772      this.bucketSize = bucketSize;
773      queue = new LruCachedBlockQueue(bytesToFree, blockSize);
774      totalSize = 0;
775    }
776
777    public void add(LruCachedBlock block) {
778      totalSize += block.heapSize();
779      queue.add(block);
780    }
781
782    public long free(long toFree) {
783      if (LOG.isTraceEnabled()) {
784        LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this);
785      }
786      LruCachedBlock cb;
787      long freedBytes = 0;
788      while ((cb = queue.pollLast()) != null) {
789        freedBytes += evictBlock(cb, true);
790        if (freedBytes >= toFree) {
791          return freedBytes;
792        }
793      }
794      if (LOG.isTraceEnabled()) {
795        LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this);
796      }
797      return freedBytes;
798    }
799
800    public long overflow() {
801      return totalSize - bucketSize;
802    }
803
804    public long totalSize() {
805      return totalSize;
806    }
807
808    @Override
809    public int compareTo(BlockBucket that) {
810      return Long.compare(this.overflow(), that.overflow());
811    }
812
813    @Override
814    public boolean equals(Object that) {
815      if (that == null || !(that instanceof BlockBucket)) {
816        return false;
817      }
818      return compareTo((BlockBucket) that) == 0;
819    }
820
821    @Override
822    public int hashCode() {
823      return Objects.hashCode(name, bucketSize, queue, totalSize);
824    }
825
826    @Override
827    public String toString() {
828      return MoreObjects.toStringHelper(this).add("name", name)
829        .add("totalSize", StringUtils.byteDesc(totalSize))
830        .add("bucketSize", StringUtils.byteDesc(bucketSize)).toString();
831    }
832  }
833
834  /**
835   * Get the maximum size of this cache.
836   * @return max size in bytes
837   */
838
839  @Override
840  public long getMaxSize() {
841    return this.maxSize;
842  }
843
844  @Override
845  public long getCurrentSize() {
846    return this.size.get();
847  }
848
849  @Override
850  public long getCurrentDataSize() {
851    return this.dataBlockSize.sum();
852  }
853
854  @Override
855  public long getFreeSize() {
856    return getMaxSize() - getCurrentSize();
857  }
858
859  @Override
860  public long size() {
861    return getMaxSize();
862  }
863
864  @Override
865  public long getBlockCount() {
866    return this.elements.get();
867  }
868
869  @Override
870  public long getDataBlockCount() {
871    return this.dataBlockElements.sum();
872  }
873
874  EvictionThread getEvictionThread() {
875    return this.evictionThread;
876  }
877
878  /*
879   * Eviction thread. Sits in waiting state until an eviction is triggered when the cache size grows
880   * above the acceptable level.<p> Thread is triggered into action by {@link
881   * LruBlockCache#runEviction()}
882   */
883  static class EvictionThread extends Thread {
884
885    private WeakReference<LruBlockCache> cache;
886    private volatile boolean go = true;
887    // flag set after enter the run method, used for test
888    private boolean enteringRun = false;
889
890    public EvictionThread(LruBlockCache cache) {
891      super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread");
892      setDaemon(true);
893      this.cache = new WeakReference<>(cache);
894    }
895
896    @Override
897    public void run() {
898      enteringRun = true;
899      while (this.go) {
900        synchronized (this) {
901          try {
902            this.wait(1000 * 10/* Don't wait for ever */);
903          } catch (InterruptedException e) {
904            LOG.warn("Interrupted eviction thread ", e);
905            Thread.currentThread().interrupt();
906          }
907        }
908        LruBlockCache cache = this.cache.get();
909        if (cache == null) {
910          this.go = false;
911          break;
912        }
913        cache.evict();
914      }
915    }
916
917    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY",
918        justification = "This is what we want")
919    public void evict() {
920      synchronized (this) {
921        this.notifyAll();
922      }
923    }
924
925    synchronized void shutdown() {
926      this.go = false;
927      this.notifyAll();
928    }
929
930    public boolean isGo() {
931      return go;
932    }
933
934    /**
935     * Used for the test.
936     */
937    boolean isEnteringRun() {
938      return this.enteringRun;
939    }
940  }
941
942  /*
943   * Statistics thread. Periodically prints the cache statistics to the log.
944   */
945  static class StatisticsThread extends Thread {
946
947    private final LruBlockCache lru;
948
949    public StatisticsThread(LruBlockCache lru) {
950      super("LruBlockCacheStats");
951      setDaemon(true);
952      this.lru = lru;
953    }
954
955    @Override
956    public void run() {
957      lru.logStats();
958    }
959  }
960
961  public void logStats() {
962    // Log size
963    long totalSize = heapSize();
964    long freeSize = maxSize - totalSize;
965    LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " + "freeSize="
966      + StringUtils.byteDesc(freeSize) + ", " + "max=" + StringUtils.byteDesc(this.maxSize) + ", "
967      + "blockCount=" + getBlockCount() + ", " + "accesses=" + stats.getRequestCount() + ", "
968      + "hits=" + stats.getHitCount() + ", " + "hitRatio="
969      + (stats.getHitCount() == 0
970        ? "0"
971        : (StringUtils.formatPercent(stats.getHitRatio(), 2) + ", "))
972      + ", " + "cachingAccesses=" + stats.getRequestCachingCount() + ", " + "cachingHits="
973      + stats.getHitCachingCount() + ", " + "cachingHitsRatio="
974      + (stats.getHitCachingCount() == 0
975        ? "0,"
976        : (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", "))
977      + "evictions=" + stats.getEvictionCount() + ", " + "evicted=" + stats.getEvictedCount() + ", "
978      + "evictedPerRun=" + stats.evictedPerEviction());
979  }
980
981  /**
982   * Get counter statistics for this cache.
983   * <p>
984   * Includes: total accesses, hits, misses, evicted blocks, and runs of the eviction processes.
985   */
986  @Override
987  public CacheStats getStats() {
988    return this.stats;
989  }
990
991  public final static long CACHE_FIXED_OVERHEAD =
992    ClassSize.estimateBase(LruBlockCache.class, false);
993
994  @Override
995  public long heapSize() {
996    return getCurrentSize();
997  }
998
999  private static long calculateOverhead(long maxSize, long blockSize, int concurrency) {
1000    // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG
1001    return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP
1002      + ((long) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY)
1003      + ((long) concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
1004  }
1005
1006  @Override
1007  public Iterator<CachedBlock> iterator() {
1008    final Iterator<LruCachedBlock> iterator = map.values().iterator();
1009
1010    return new Iterator<CachedBlock>() {
1011      private final long now = System.nanoTime();
1012
1013      @Override
1014      public boolean hasNext() {
1015        return iterator.hasNext();
1016      }
1017
1018      @Override
1019      public CachedBlock next() {
1020        final LruCachedBlock b = iterator.next();
1021        return new CachedBlock() {
1022          @Override
1023          public String toString() {
1024            return BlockCacheUtil.toString(this, now);
1025          }
1026
1027          @Override
1028          public BlockPriority getBlockPriority() {
1029            return b.getPriority();
1030          }
1031
1032          @Override
1033          public BlockType getBlockType() {
1034            return b.getBuffer().getBlockType();
1035          }
1036
1037          @Override
1038          public long getOffset() {
1039            return b.getCacheKey().getOffset();
1040          }
1041
1042          @Override
1043          public long getSize() {
1044            return b.getBuffer().heapSize();
1045          }
1046
1047          @Override
1048          public long getCachedTime() {
1049            return b.getCachedTime();
1050          }
1051
1052          @Override
1053          public String getFilename() {
1054            return b.getCacheKey().getHfileName();
1055          }
1056
1057          @Override
1058          public int compareTo(CachedBlock other) {
1059            int diff = this.getFilename().compareTo(other.getFilename());
1060            if (diff != 0) {
1061              return diff;
1062            }
1063            diff = Long.compare(this.getOffset(), other.getOffset());
1064            if (diff != 0) {
1065              return diff;
1066            }
1067            if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
1068              throw new IllegalStateException(this.getCachedTime() + ", " + other.getCachedTime());
1069            }
1070            return Long.compare(other.getCachedTime(), this.getCachedTime());
1071          }
1072
1073          @Override
1074          public int hashCode() {
1075            return b.hashCode();
1076          }
1077
1078          @Override
1079          public boolean equals(Object obj) {
1080            if (obj instanceof CachedBlock) {
1081              CachedBlock cb = (CachedBlock) obj;
1082              return compareTo(cb) == 0;
1083            } else {
1084              return false;
1085            }
1086          }
1087        };
1088      }
1089
1090      @Override
1091      public void remove() {
1092        throw new UnsupportedOperationException();
1093      }
1094    };
1095  }
1096
1097  // Simple calculators of sizes given factors and maxSize
1098
1099  long acceptableSize() {
1100    return (long) Math.floor(this.maxSize * this.acceptableFactor);
1101  }
1102
1103  private long minSize() {
1104    return (long) Math.floor(this.maxSize * this.minFactor);
1105  }
1106
1107  private long singleSize() {
1108    return (long) Math.floor(this.maxSize * this.singleFactor * this.minFactor);
1109  }
1110
1111  private long multiSize() {
1112    return (long) Math.floor(this.maxSize * this.multiFactor * this.minFactor);
1113  }
1114
1115  private long memorySize() {
1116    return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
1117  }
1118
1119  @Override
1120  public void shutdown() {
1121    if (victimHandler != null) {
1122      victimHandler.shutdown();
1123    }
1124    this.scheduleThreadPool.shutdown();
1125    for (int i = 0; i < 10; i++) {
1126      if (!this.scheduleThreadPool.isShutdown()) {
1127        try {
1128          Thread.sleep(10);
1129        } catch (InterruptedException e) {
1130          LOG.warn("Interrupted while sleeping");
1131          Thread.currentThread().interrupt();
1132          break;
1133        }
1134      }
1135    }
1136
1137    if (!this.scheduleThreadPool.isShutdown()) {
1138      List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
1139      LOG.debug("Still running " + runnables);
1140    }
1141    this.evictionThread.shutdown();
1142  }
1143
1144  /** Clears the cache. Used in tests. */
1145  public void clearCache() {
1146    this.map.clear();
1147    this.elements.set(0);
1148  }
1149
1150  /**
1151   * Used in testing. May be very inefficient.
1152   * @return the set of cached file names
1153   */
1154  SortedSet<String> getCachedFileNamesForTest() {
1155    SortedSet<String> fileNames = new TreeSet<>();
1156    for (BlockCacheKey cacheKey : map.keySet()) {
1157      fileNames.add(cacheKey.getHfileName());
1158    }
1159    return fileNames;
1160  }
1161
1162  public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
1163    Map<DataBlockEncoding, Integer> counts = new EnumMap<>(DataBlockEncoding.class);
1164    for (LruCachedBlock block : map.values()) {
1165      DataBlockEncoding encoding = ((HFileBlock) block.getBuffer()).getDataBlockEncoding();
1166      Integer count = counts.get(encoding);
1167      counts.put(encoding, (count == null ? 0 : count) + 1);
1168    }
1169    return counts;
1170  }
1171
1172  Map<BlockCacheKey, LruCachedBlock> getMapForTests() {
1173    return map;
1174  }
1175
1176  @Override
1177  public BlockCache[] getBlockCaches() {
1178    if (victimHandler != null) {
1179      return new BlockCache[] { this, this.victimHandler };
1180    }
1181    return null;
1182  }
1183}