001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static java.util.Objects.requireNonNull;
021
022import java.lang.ref.WeakReference;
023import java.util.EnumMap;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027import java.util.PriorityQueue;
028import java.util.SortedSet;
029import java.util.TreeSet;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.Executors;
032import java.util.concurrent.ScheduledExecutorService;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicLong;
035import java.util.concurrent.atomic.LongAdder;
036import java.util.concurrent.locks.ReentrantLock;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.io.HeapSize;
039import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
040import org.apache.hadoop.hbase.util.ClassSize;
041import org.apache.hadoop.util.StringUtils;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
047import org.apache.hbase.thirdparty.com.google.common.base.Objects;
048import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
049
050/**
051 * A block cache implementation that is memory-aware using {@link HeapSize}, memory-bound using an
052 * LRU eviction algorithm, and concurrent: backed by a {@link ConcurrentHashMap} and with a
053 * non-blocking eviction thread giving constant-time {@link #cacheBlock} and {@link #getBlock}
054 * operations.
055 * <p>
056 * Contains three levels of block priority to allow for scan-resistance and in-memory families
057 * {@link org.apache.hadoop.hbase.HColumnDescriptor#setInMemory(boolean)} (An in-memory column
058 * family is a column family that should be served from memory if possible): single-access,
059 * multiple-accesses, and in-memory priority. A block is added with an in-memory priority flag if
060 * {@link org.apache.hadoop.hbase.HColumnDescriptor#isInMemory()}, otherwise a block becomes a
061 * single access priority the first time it is read into this block cache. If a block is accessed
062 * again while in cache, it is marked as a multiple access priority block. This delineation of
063 * blocks is used to prevent scans from thrashing the cache adding a least-frequently-used element
064 * to the eviction algorithm.
065 * <p>
066 * Each priority is given its own chunk of the total cache to ensure fairness during eviction. Each
067 * priority will retain close to its maximum size, however, if any priority is not using its entire
068 * chunk the others are able to grow beyond their chunk size.
069 * <p>
070 * Instantiated at a minimum with the total size and average block size. All sizes are in bytes. The
071 * block size is not especially important as this cache is fully dynamic in its sizing of blocks. It
072 * is only used for pre-allocating data structures and in initial heap estimation of the map.
073 * <p>
074 * The detailed constructor defines the sizes for the three priorities (they should total to the
075 * <code>maximum size</code> defined). It also sets the levels that trigger and control the eviction
076 * thread.
077 * <p>
078 * The <code>acceptable size</code> is the cache size level which triggers the eviction process to
079 * start. It evicts enough blocks to get the size below the minimum size specified.
080 * <p>
081 * Eviction happens in a separate thread and involves a single full-scan of the map. It determines
082 * how many bytes must be freed to reach the minimum size, and then while scanning determines the
083 * fewest least-recently-used blocks necessary from each of the three priorities (would be 3 times
084 * bytes to free). It then uses the priority chunk sizes to evict fairly according to the relative
085 * sizes and usage.
086 */
087@InterfaceAudience.Private
088public class LruBlockCache implements FirstLevelBlockCache {
089
090  private static final Logger LOG = LoggerFactory.getLogger(LruBlockCache.class);
091
092  /**
093   * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep
094   * evicting during an eviction run till the cache size is down to 80% of the total.
095   */
096  private static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";
097
098  /**
099   * Acceptable size of cache (no evictions if size < acceptable)
100   */
101  private static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME =
102    "hbase.lru.blockcache.acceptable.factor";
103
104  /**
105   * Hard capacity limit of cache, will reject any put if size > this * acceptable
106   */
107  static final String LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME =
108    "hbase.lru.blockcache.hard.capacity.limit.factor";
109  private static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME =
110    "hbase.lru.blockcache.single.percentage";
111  private static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME =
112    "hbase.lru.blockcache.multi.percentage";
113  private static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME =
114    "hbase.lru.blockcache.memory.percentage";
115
116  /**
117   * Configuration key to force data-block always (except in-memory are too much) cached in memory
118   * for in-memory hfile, unlike inMemory, which is a column-family configuration, inMemoryForceMode
119   * is a cluster-wide configuration
120   */
121  private static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME =
122    "hbase.lru.rs.inmemoryforcemode";
123
124  /* Default Configuration Parameters */
125
126  /* Backing Concurrent Map Configuration */
127  static final float DEFAULT_LOAD_FACTOR = 0.75f;
128  static final int DEFAULT_CONCURRENCY_LEVEL = 16;
129
130  /* Eviction thresholds */
131  private static final float DEFAULT_MIN_FACTOR = 0.95f;
132  static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f;
133
134  /* Priority buckets */
135  private static final float DEFAULT_SINGLE_FACTOR = 0.25f;
136  private static final float DEFAULT_MULTI_FACTOR = 0.50f;
137  private static final float DEFAULT_MEMORY_FACTOR = 0.25f;
138
139  private static final float DEFAULT_HARD_CAPACITY_LIMIT_FACTOR = 1.2f;
140
141  private static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false;
142
143  /* Statistics thread */
144  private static final int STAT_THREAD_PERIOD = 60 * 5;
145  private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size";
146  private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
147
148  /**
149   * Defined the cache map as {@link ConcurrentHashMap} here, because in
150   * {@link LruBlockCache#getBlock}, we need to guarantee the atomicity of map#computeIfPresent
151   * (key, func). Besides, the func method must execute exactly once only when the key is present
152   * and under the lock context, otherwise the reference count will be messed up. Notice that the
153   * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that.
154   */
155  private transient final ConcurrentHashMap<BlockCacheKey, LruCachedBlock> map;
156
157  /** Eviction lock (locked when eviction in process) */
158  private transient final ReentrantLock evictionLock = new ReentrantLock(true);
159
160  private final long maxBlockSize;
161
162  /** Volatile boolean to track if we are in an eviction process or not */
163  private volatile boolean evictionInProgress = false;
164
165  /** Eviction thread */
166  private transient final EvictionThread evictionThread;
167
168  /** Statistics thread schedule pool (for heavy debugging, could remove) */
169  private transient final ScheduledExecutorService scheduleThreadPool =
170    Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
171      .setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build());
172
173  /** Current size of cache */
174  private final AtomicLong size;
175
176  /** Current size of data blocks */
177  private final LongAdder dataBlockSize = new LongAdder();
178
179  /** Current size of index blocks */
180  private final LongAdder indexBlockSize = new LongAdder();
181
182  /** Current size of bloom blocks */
183  private final LongAdder bloomBlockSize = new LongAdder();
184
185  /** Current number of cached elements */
186  private final AtomicLong elements;
187
188  /** Current number of cached data block elements */
189  private final LongAdder dataBlockElements = new LongAdder();
190
191  /** Current number of cached index block elements */
192  private final LongAdder indexBlockElements = new LongAdder();
193
194  /** Current number of cached bloom block elements */
195  private final LongAdder bloomBlockElements = new LongAdder();
196
197  /** Cache access count (sequential ID) */
198  private final AtomicLong count;
199
200  /** hard capacity limit */
201  private float hardCapacityLimitFactor;
202
203  /** Cache statistics */
204  private final CacheStats stats;
205
206  /** Maximum allowable size of cache (block put if size > max, evict) */
207  private long maxSize;
208
209  /** Approximate block size */
210  private long blockSize;
211
212  /** Acceptable size of cache (no evictions if size < acceptable) */
213  private float acceptableFactor;
214
215  /** Minimum threshold of cache (when evicting, evict until size < min) */
216  private float minFactor;
217
218  /** Single access bucket size */
219  private float singleFactor;
220
221  /** Multiple access bucket size */
222  private float multiFactor;
223
224  /** In-memory bucket size */
225  private float memoryFactor;
226
227  /** Overhead of the structure itself */
228  private long overhead;
229
230  /** Whether in-memory hfile's data block has higher priority when evicting */
231  private boolean forceInMemory;
232
233  /**
234   * Where to send victims (blocks evicted/missing from the cache). This is used only when we use an
235   * external cache as L2. Note: See org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache
236   */
237  private transient BlockCache victimHandler = null;
238
239  /**
240   * Default constructor. Specify maximum size and expected average block size (approximation is
241   * fine).
242   * <p>
243   * All other factors will be calculated based on defaults specified in this class.
244   * @param maxSize   maximum size of cache, in bytes
245   * @param blockSize approximate size of each block, in bytes
246   */
247  public LruBlockCache(long maxSize, long blockSize) {
248    this(maxSize, blockSize, true);
249  }
250
251  /**
252   * Constructor used for testing. Allows disabling of the eviction thread.
253   */
254  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
255    this(maxSize, blockSize, evictionThread, (int) Math.ceil(1.2 * maxSize / blockSize),
256      DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL, DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
257      DEFAULT_SINGLE_FACTOR, DEFAULT_MULTI_FACTOR, DEFAULT_MEMORY_FACTOR,
258      DEFAULT_HARD_CAPACITY_LIMIT_FACTOR, false, DEFAULT_MAX_BLOCK_SIZE);
259  }
260
261  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
262    this(maxSize, blockSize, evictionThread, (int) Math.ceil(1.2 * maxSize / blockSize),
263      DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
264      conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
265      conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR),
266      conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR),
267      conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR),
268      conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR),
269      conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, DEFAULT_HARD_CAPACITY_LIMIT_FACTOR),
270      conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE),
271      conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE));
272  }
273
274  public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
275    this(maxSize, blockSize, true, conf);
276  }
277
278  /**
279   * Configurable constructor. Use this constructor if not using defaults.
280   * @param maxSize             maximum size of this cache, in bytes
281   * @param blockSize           expected average size of blocks, in bytes
282   * @param evictionThread      whether to run evictions in a bg thread or not
283   * @param mapInitialSize      initial size of backing ConcurrentHashMap
284   * @param mapLoadFactor       initial load factor of backing ConcurrentHashMap
285   * @param mapConcurrencyLevel initial concurrency factor for backing CHM
286   * @param minFactor           percentage of total size that eviction will evict until
287   * @param acceptableFactor    percentage of total size that triggers eviction
288   * @param singleFactor        percentage of total size for single-access blocks
289   * @param multiFactor         percentage of total size for multiple-access blocks
290   * @param memoryFactor        percentage of total size for in-memory blocks
291   */
292  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, int mapInitialSize,
293    float mapLoadFactor, int mapConcurrencyLevel, float minFactor, float acceptableFactor,
294    float singleFactor, float multiFactor, float memoryFactor, float hardLimitFactor,
295    boolean forceInMemory, long maxBlockSize) {
296    this.maxBlockSize = maxBlockSize;
297    if (
298      singleFactor + multiFactor + memoryFactor != 1 || singleFactor < 0 || multiFactor < 0
299        || memoryFactor < 0
300    ) {
301      throw new IllegalArgumentException(
302        "Single, multi, and memory factors " + " should be non-negative and total 1.0");
303    }
304    if (minFactor >= acceptableFactor) {
305      throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
306    }
307    if (minFactor >= 1.0f || acceptableFactor >= 1.0f) {
308      throw new IllegalArgumentException("all factors must be < 1");
309    }
310    this.maxSize = maxSize;
311    this.blockSize = blockSize;
312    this.forceInMemory = forceInMemory;
313    map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel);
314    this.minFactor = minFactor;
315    this.acceptableFactor = acceptableFactor;
316    this.singleFactor = singleFactor;
317    this.multiFactor = multiFactor;
318    this.memoryFactor = memoryFactor;
319    this.stats = new CacheStats(this.getClass().getSimpleName());
320    this.count = new AtomicLong(0);
321    this.elements = new AtomicLong(0);
322    this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
323    this.size = new AtomicLong(this.overhead);
324    this.hardCapacityLimitFactor = hardLimitFactor;
325    if (evictionThread) {
326      this.evictionThread = new EvictionThread(this);
327      this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
328    } else {
329      this.evictionThread = null;
330    }
331    // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log
332    // every five minutes.
333    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD,
334      STAT_THREAD_PERIOD, TimeUnit.SECONDS);
335  }
336
337  @Override
338  public void setVictimCache(BlockCache victimCache) {
339    if (victimHandler != null) {
340      throw new IllegalArgumentException("The victim cache has already been set");
341    }
342    victimHandler = requireNonNull(victimCache);
343  }
344
345  @Override
346  public void setMaxSize(long maxSize) {
347    this.maxSize = maxSize;
348    if (this.size.get() > acceptableSize() && !evictionInProgress) {
349      runEviction();
350    }
351  }
352
353  /**
354   * The block cached in LRUBlockCache will always be an heap block: on the one side, the heap
355   * access will be more faster then off-heap, the small index block or meta block cached in
356   * CombinedBlockCache will benefit a lot. on other side, the LRUBlockCache size is always
357   * calculated based on the total heap size, if caching an off-heap block in LRUBlockCache, the
358   * heap size will be messed up. Here we will clone the block into an heap block if it's an
359   * off-heap block, otherwise just use the original block. The key point is maintain the refCnt of
360   * the block (HBASE-22127): <br>
361   * 1. if cache the cloned heap block, its refCnt is an totally new one, it's easy to handle; <br>
362   * 2. if cache the original heap block, we're sure that it won't be tracked in ByteBuffAllocator's
363   * reservoir, if both RPC and LRUBlockCache release the block, then it can be garbage collected by
364   * JVM, so need a retain here.
365   * @param buf the original block
366   * @return an block with an heap memory backend.
367   */
368  private Cacheable asReferencedHeapBlock(Cacheable buf) {
369    if (buf instanceof HFileBlock) {
370      HFileBlock blk = ((HFileBlock) buf);
371      if (blk.isSharedMem()) {
372        return HFileBlock.deepCloneOnHeap(blk);
373      }
374    }
375    // The block will be referenced by this LRUBlockCache, so should increase its refCnt here.
376    return buf.retain();
377  }
378
379  // BlockCache implementation
380
381  /**
382   * Cache the block with the specified name and buffer.
383   * <p>
384   * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)
385   * this can happen, for which we compare the buffer contents.
386   * @param cacheKey block's cache key
387   * @param buf      block buffer
388   * @param inMemory if block is in-memory
389   */
390  @Override
391  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
392    if (buf.heapSize() > maxBlockSize) {
393      // If there are a lot of blocks that are too
394      // big this can make the logs way too noisy.
395      // So we log 2%
396      if (stats.failInsert() % 50 == 0) {
397        LOG.warn("Trying to cache too large a block " + cacheKey.getHfileName() + " @ "
398          + cacheKey.getOffset() + " is " + buf.heapSize() + " which is larger than "
399          + maxBlockSize);
400      }
401      return;
402    }
403
404    LruCachedBlock cb = map.get(cacheKey);
405    if (cb != null && !BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, buf)) {
406      return;
407    }
408    long currentSize = size.get();
409    long currentAcceptableSize = acceptableSize();
410    long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize);
411    if (currentSize >= hardLimitSize) {
412      stats.failInsert();
413      if (LOG.isTraceEnabled()) {
414        LOG.trace("LruBlockCache current size " + StringUtils.byteDesc(currentSize)
415          + " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + "."
416          + " The hard limit size is " + StringUtils.byteDesc(hardLimitSize)
417          + ", failed to put cacheKey:" + cacheKey + " into LruBlockCache.");
418      }
419      if (!evictionInProgress) {
420        runEviction();
421      }
422      return;
423    }
424    // Ensure that the block is an heap one.
425    buf = asReferencedHeapBlock(buf);
426    cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
427    long newSize = updateSizeMetrics(cb, false);
428    map.put(cacheKey, cb);
429    long val = elements.incrementAndGet();
430    if (buf.getBlockType().isBloom()) {
431      bloomBlockElements.increment();
432    } else if (buf.getBlockType().isIndex()) {
433      indexBlockElements.increment();
434    } else if (buf.getBlockType().isData()) {
435      dataBlockElements.increment();
436    }
437    if (LOG.isTraceEnabled()) {
438      long size = map.size();
439      assertCounterSanity(size, val);
440    }
441    if (newSize > currentAcceptableSize && !evictionInProgress) {
442      runEviction();
443    }
444  }
445
446  /**
447   * Sanity-checking for parity between actual block cache content and metrics. Intended only for
448   * use with TRACE level logging and -ea JVM.
449   */
450  private static void assertCounterSanity(long mapSize, long counterVal) {
451    if (counterVal < 0) {
452      LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal
453        + ", mapSize=" + mapSize);
454      return;
455    }
456    if (mapSize < Integer.MAX_VALUE) {
457      double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.);
458      if (pct_diff > 0.05) {
459        LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal
460          + ", mapSize=" + mapSize);
461      }
462    }
463  }
464
465  /**
466   * Cache the block with the specified name and buffer.
467   * <p>
468   * TODO after HBASE-22005, we may cache an block which allocated from off-heap, but our LRU cache
469   * sizing is based on heap size, so we should handle this in HBASE-22127. It will introduce an
470   * switch whether make the LRU on-heap or not, if so we may need copy the memory to on-heap,
471   * otherwise the caching size is based on off-heap.
472   * @param cacheKey block's cache key
473   * @param buf      block buffer
474   */
475  @Override
476  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
477    cacheBlock(cacheKey, buf, false);
478  }
479
480  /**
481   * Helper function that updates the local size counter and also updates any per-cf or
482   * per-blocktype metrics it can discern from given {@link LruCachedBlock}
483   */
484  private long updateSizeMetrics(LruCachedBlock cb, boolean evict) {
485    long heapsize = cb.heapSize();
486    BlockType bt = cb.getBuffer().getBlockType();
487    if (evict) {
488      heapsize *= -1;
489    }
490    if (bt != null) {
491      if (bt.isBloom()) {
492        bloomBlockSize.add(heapsize);
493      } else if (bt.isIndex()) {
494        indexBlockSize.add(heapsize);
495      } else if (bt.isData()) {
496        dataBlockSize.add(heapsize);
497      }
498    }
499    return size.addAndGet(heapsize);
500  }
501
502  /**
503   * Get the buffer of the block with the specified name.
504   * @param cacheKey           block's cache key
505   * @param caching            true if the caller caches blocks on cache misses
506   * @param repeat             Whether this is a repeat lookup for the same block (used to avoid
507   *                           double counting cache misses when doing double-check locking)
508   * @param updateCacheMetrics Whether to update cache metrics or not
509   * @return buffer of specified cache key, or null if not in cache
510   */
511  @Override
512  public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
513    boolean updateCacheMetrics) {
514    LruCachedBlock cb = map.computeIfPresent(cacheKey, (key, val) -> {
515      // It will be referenced by RPC path, so increase here. NOTICE: Must do the retain inside
516      // this block. because if retain outside the map#computeIfPresent, the evictBlock may remove
517      // the block and release, then we're retaining a block with refCnt=0 which is disallowed.
518      // see HBASE-22422.
519      val.getBuffer().retain();
520      return val;
521    });
522    if (cb == null) {
523      if (!repeat && updateCacheMetrics) {
524        stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
525      }
526      // If there is another block cache then try and read there.
527      // However if this is a retry ( second time in double checked locking )
528      // And it's already a miss then the l2 will also be a miss.
529      if (victimHandler != null && !repeat) {
530        // The handler will increase result's refCnt for RPC, so need no extra retain.
531        Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
532        // Promote this to L1.
533        if (result != null) {
534          if (caching) {
535            cacheBlock(cacheKey, result, /* inMemory = */ false);
536          }
537        }
538        return result;
539      }
540      return null;
541    }
542    if (updateCacheMetrics) {
543      stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
544    }
545    cb.access(count.incrementAndGet());
546    return cb.getBuffer();
547  }
548
549  /**
550   * Whether the cache contains block with specified cacheKey
551   * @return true if contains the block
552   */
553  @Override
554  public boolean containsBlock(BlockCacheKey cacheKey) {
555    return map.containsKey(cacheKey);
556  }
557
558  @Override
559  public boolean evictBlock(BlockCacheKey cacheKey) {
560    LruCachedBlock cb = map.get(cacheKey);
561    return cb != null && evictBlock(cb, false) > 0;
562  }
563
564  /**
565   * Evicts all blocks for a specific HFile. This is an expensive operation implemented as a
566   * linear-time search through all blocks in the cache. Ideally this should be a search in a
567   * log-access-time map.
568   * <p>
569   * This is used for evict-on-close to remove all blocks of a specific HFile.
570   * @return the number of blocks evicted
571   */
572  @Override
573  public int evictBlocksByHfileName(String hfileName) {
574    int numEvicted = 0;
575    for (BlockCacheKey key : map.keySet()) {
576      if (key.getHfileName().equals(hfileName)) {
577        if (evictBlock(key)) {
578          ++numEvicted;
579        }
580      }
581    }
582    if (victimHandler != null) {
583      numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
584    }
585    return numEvicted;
586  }
587
588  /**
589   * Evict the block, and it will be cached by the victim handler if exists &amp;&amp; block may be
590   * read again later
591   * @param evictedByEvictionProcess true if the given block is evicted by EvictionThread
592   * @return the heap size of evicted block
593   */
594  protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) {
595    LruCachedBlock previous = map.remove(block.getCacheKey());
596    if (previous == null) {
597      return 0;
598    }
599    updateSizeMetrics(block, true);
600    long val = elements.decrementAndGet();
601    if (LOG.isTraceEnabled()) {
602      long size = map.size();
603      assertCounterSanity(size, val);
604    }
605    BlockType bt = block.getBuffer().getBlockType();
606    if (bt.isBloom()) {
607      bloomBlockElements.decrement();
608    } else if (bt.isIndex()) {
609      indexBlockElements.decrement();
610    } else if (bt.isData()) {
611      dataBlockElements.decrement();
612    }
613    if (evictedByEvictionProcess) {
614      // When the eviction of the block happened because of invalidation of HFiles, no need to
615      // update the stats counter.
616      stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary());
617      if (victimHandler != null) {
618        victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer());
619      }
620    }
621    // Decrease the block's reference count, and if refCount is 0, then it'll auto-deallocate. DO
622    // NOT move this up because if do that then the victimHandler may access the buffer with
623    // refCnt = 0 which is disallowed.
624    previous.getBuffer().release();
625    return block.heapSize();
626  }
627
628  /**
629   * Multi-threaded call to run the eviction process.
630   */
631  private void runEviction() {
632    if (evictionThread == null || !evictionThread.isGo()) {
633      evict();
634    } else {
635      evictionThread.evict();
636    }
637  }
638
639  boolean isEvictionInProgress() {
640    return evictionInProgress;
641  }
642
643  long getOverhead() {
644    return overhead;
645  }
646
647  /**
648   * Eviction method.
649   */
650  void evict() {
651
652    // Ensure only one eviction at a time
653    if (!evictionLock.tryLock()) {
654      return;
655    }
656
657    try {
658      evictionInProgress = true;
659      long currentSize = this.size.get();
660      long bytesToFree = currentSize - minSize();
661
662      if (LOG.isTraceEnabled()) {
663        LOG.trace("Block cache LRU eviction started; Attempting to free "
664          + StringUtils.byteDesc(bytesToFree) + " of total=" + StringUtils.byteDesc(currentSize));
665      }
666
667      if (bytesToFree <= 0) {
668        return;
669      }
670
671      // Instantiate priority buckets
672      BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize, singleSize());
673      BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize, multiSize());
674      BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize, memorySize());
675
676      // Scan entire map putting into appropriate buckets
677      for (LruCachedBlock cachedBlock : map.values()) {
678        switch (cachedBlock.getPriority()) {
679          case SINGLE: {
680            bucketSingle.add(cachedBlock);
681            break;
682          }
683          case MULTI: {
684            bucketMulti.add(cachedBlock);
685            break;
686          }
687          case MEMORY: {
688            bucketMemory.add(cachedBlock);
689            break;
690          }
691        }
692      }
693
694      long bytesFreed = 0;
695      if (forceInMemory || memoryFactor > 0.999f) {
696        long s = bucketSingle.totalSize();
697        long m = bucketMulti.totalSize();
698        if (bytesToFree > (s + m)) {
699          // this means we need to evict blocks in memory bucket to make room,
700          // so the single and multi buckets will be emptied
701          bytesFreed = bucketSingle.free(s);
702          bytesFreed += bucketMulti.free(m);
703          if (LOG.isTraceEnabled()) {
704            LOG.trace(
705              "freed " + StringUtils.byteDesc(bytesFreed) + " from single and multi buckets");
706          }
707          bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
708          if (LOG.isTraceEnabled()) {
709            LOG.trace(
710              "freed " + StringUtils.byteDesc(bytesFreed) + " total from all three buckets ");
711          }
712        } else {
713          // this means no need to evict block in memory bucket,
714          // and we try best to make the ratio between single-bucket and
715          // multi-bucket is 1:2
716          long bytesRemain = s + m - bytesToFree;
717          if (3 * s <= bytesRemain) {
718            // single-bucket is small enough that no eviction happens for it
719            // hence all eviction goes from multi-bucket
720            bytesFreed = bucketMulti.free(bytesToFree);
721          } else if (3 * m <= 2 * bytesRemain) {
722            // multi-bucket is small enough that no eviction happens for it
723            // hence all eviction goes from single-bucket
724            bytesFreed = bucketSingle.free(bytesToFree);
725          } else {
726            // both buckets need to evict some blocks
727            bytesFreed = bucketSingle.free(s - bytesRemain / 3);
728            if (bytesFreed < bytesToFree) {
729              bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
730            }
731          }
732        }
733      } else {
734        PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3);
735
736        bucketQueue.add(bucketSingle);
737        bucketQueue.add(bucketMulti);
738        bucketQueue.add(bucketMemory);
739
740        int remainingBuckets = bucketQueue.size();
741
742        BlockBucket bucket;
743        while ((bucket = bucketQueue.poll()) != null) {
744          long overflow = bucket.overflow();
745          if (overflow > 0) {
746            long bucketBytesToFree =
747              Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets);
748            bytesFreed += bucket.free(bucketBytesToFree);
749          }
750          remainingBuckets--;
751        }
752      }
753      if (LOG.isTraceEnabled()) {
754        long single = bucketSingle.totalSize();
755        long multi = bucketMulti.totalSize();
756        long memory = bucketMemory.totalSize();
757        LOG.trace(
758          "Block cache LRU eviction completed; " + "freed=" + StringUtils.byteDesc(bytesFreed)
759            + ", " + "total=" + StringUtils.byteDesc(this.size.get()) + ", " + "single="
760            + StringUtils.byteDesc(single) + ", " + "multi=" + StringUtils.byteDesc(multi) + ", "
761            + "memory=" + StringUtils.byteDesc(memory));
762      }
763    } finally {
764      stats.evict();
765      evictionInProgress = false;
766      evictionLock.unlock();
767    }
768  }
769
770  @Override
771  public String toString() {
772    return MoreObjects.toStringHelper(this).add("blockCount", getBlockCount())
773      .add("currentSize", StringUtils.byteDesc(getCurrentSize()))
774      .add("freeSize", StringUtils.byteDesc(getFreeSize()))
775      .add("maxSize", StringUtils.byteDesc(getMaxSize()))
776      .add("heapSize", StringUtils.byteDesc(heapSize()))
777      .add("minSize", StringUtils.byteDesc(minSize())).add("minFactor", minFactor)
778      .add("multiSize", StringUtils.byteDesc(multiSize())).add("multiFactor", multiFactor)
779      .add("singleSize", StringUtils.byteDesc(singleSize())).add("singleFactor", singleFactor)
780      .toString();
781  }
782
783  /**
784   * Used to group blocks into priority buckets. There will be a BlockBucket for each priority
785   * (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate number of
786   * elements out of each according to configuration parameters and their relatives sizes.
787   */
788  private class BlockBucket implements Comparable<BlockBucket> {
789
790    private final String name;
791    private LruCachedBlockQueue queue;
792    private long totalSize = 0;
793    private long bucketSize;
794
795    public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) {
796      this.name = name;
797      this.bucketSize = bucketSize;
798      queue = new LruCachedBlockQueue(bytesToFree, blockSize);
799      totalSize = 0;
800    }
801
802    public void add(LruCachedBlock block) {
803      totalSize += block.heapSize();
804      queue.add(block);
805    }
806
807    public long free(long toFree) {
808      if (LOG.isTraceEnabled()) {
809        LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this);
810      }
811      LruCachedBlock cb;
812      long freedBytes = 0;
813      while ((cb = queue.pollLast()) != null) {
814        freedBytes += evictBlock(cb, true);
815        if (freedBytes >= toFree) {
816          return freedBytes;
817        }
818      }
819      if (LOG.isTraceEnabled()) {
820        LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this);
821      }
822      return freedBytes;
823    }
824
825    public long overflow() {
826      return totalSize - bucketSize;
827    }
828
829    public long totalSize() {
830      return totalSize;
831    }
832
833    @Override
834    public int compareTo(BlockBucket that) {
835      return Long.compare(this.overflow(), that.overflow());
836    }
837
838    @Override
839    public boolean equals(Object that) {
840      if (that == null || !(that instanceof BlockBucket)) {
841        return false;
842      }
843      return compareTo((BlockBucket) that) == 0;
844    }
845
846    @Override
847    public int hashCode() {
848      return Objects.hashCode(name, bucketSize, queue, totalSize);
849    }
850
851    @Override
852    public String toString() {
853      return MoreObjects.toStringHelper(this).add("name", name)
854        .add("totalSize", StringUtils.byteDesc(totalSize))
855        .add("bucketSize", StringUtils.byteDesc(bucketSize)).toString();
856    }
857  }
858
859  /**
860   * Get the maximum size of this cache.
861   * @return max size in bytes
862   */
863
864  @Override
865  public long getMaxSize() {
866    return this.maxSize;
867  }
868
869  @Override
870  public long getCurrentSize() {
871    return this.size.get();
872  }
873
874  @Override
875  public long getCurrentDataSize() {
876    return this.dataBlockSize.sum();
877  }
878
879  public long getCurrentIndexSize() {
880    return this.indexBlockSize.sum();
881  }
882
883  public long getCurrentBloomSize() {
884    return this.bloomBlockSize.sum();
885  }
886
887  @Override
888  public long getFreeSize() {
889    return getMaxSize() - getCurrentSize();
890  }
891
892  @Override
893  public long size() {
894    return getMaxSize();
895  }
896
897  @Override
898  public long getBlockCount() {
899    return this.elements.get();
900  }
901
902  @Override
903  public long getDataBlockCount() {
904    return this.dataBlockElements.sum();
905  }
906
907  public long getIndexBlockCount() {
908    return this.indexBlockElements.sum();
909  }
910
911  public long getBloomBlockCount() {
912    return this.bloomBlockElements.sum();
913  }
914
915  EvictionThread getEvictionThread() {
916    return this.evictionThread;
917  }
918
919  /*
920   * Eviction thread. Sits in waiting state until an eviction is triggered when the cache size grows
921   * above the acceptable level.<p> Thread is triggered into action by {@link
922   * LruBlockCache#runEviction()}
923   */
924  static class EvictionThread extends Thread {
925
926    private WeakReference<LruBlockCache> cache;
927    private volatile boolean go = true;
928    // flag set after enter the run method, used for test
929    private boolean enteringRun = false;
930
931    public EvictionThread(LruBlockCache cache) {
932      super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread");
933      setDaemon(true);
934      this.cache = new WeakReference<>(cache);
935    }
936
937    @Override
938    public void run() {
939      enteringRun = true;
940      while (this.go) {
941        synchronized (this) {
942          try {
943            this.wait(1000 * 10/* Don't wait for ever */);
944          } catch (InterruptedException e) {
945            LOG.warn("Interrupted eviction thread ", e);
946            Thread.currentThread().interrupt();
947          }
948        }
949        LruBlockCache cache = this.cache.get();
950        if (cache == null) {
951          this.go = false;
952          break;
953        }
954        cache.evict();
955      }
956    }
957
958    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY",
959        justification = "This is what we want")
960    public void evict() {
961      synchronized (this) {
962        this.notifyAll();
963      }
964    }
965
966    synchronized void shutdown() {
967      this.go = false;
968      this.notifyAll();
969    }
970
971    public boolean isGo() {
972      return go;
973    }
974
975    /**
976     * Used for the test.
977     */
978    boolean isEnteringRun() {
979      return this.enteringRun;
980    }
981  }
982
983  /*
984   * Statistics thread. Periodically prints the cache statistics to the log.
985   */
986  static class StatisticsThread extends Thread {
987
988    private final LruBlockCache lru;
989
990    public StatisticsThread(LruBlockCache lru) {
991      super("LruBlockCacheStats");
992      setDaemon(true);
993      this.lru = lru;
994    }
995
996    @Override
997    public void run() {
998      lru.logStats();
999    }
1000  }
1001
1002  public void logStats() {
1003    // Log size
1004    long totalSize = heapSize();
1005    long freeSize = maxSize - totalSize;
1006    LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " + "freeSize="
1007      + StringUtils.byteDesc(freeSize) + ", " + "max=" + StringUtils.byteDesc(this.maxSize) + ", "
1008      + "blockCount=" + getBlockCount() + ", " + "accesses=" + stats.getRequestCount() + ", "
1009      + "hits=" + stats.getHitCount() + ", " + "hitRatio="
1010      + (stats.getHitCount() == 0
1011        ? "0"
1012        : (StringUtils.formatPercent(stats.getHitRatio(), 2) + ", "))
1013      + ", " + "cachingAccesses=" + stats.getRequestCachingCount() + ", " + "cachingHits="
1014      + stats.getHitCachingCount() + ", " + "cachingHitsRatio="
1015      + (stats.getHitCachingCount() == 0
1016        ? "0,"
1017        : (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", "))
1018      + "evictions=" + stats.getEvictionCount() + ", " + "evicted=" + stats.getEvictedCount() + ", "
1019      + "evictedPerRun=" + stats.evictedPerEviction());
1020  }
1021
1022  /**
1023   * Get counter statistics for this cache.
1024   * <p>
1025   * Includes: total accesses, hits, misses, evicted blocks, and runs of the eviction processes.
1026   */
1027  @Override
1028  public CacheStats getStats() {
1029    return this.stats;
1030  }
1031
1032  public final static long CACHE_FIXED_OVERHEAD =
1033    ClassSize.estimateBase(LruBlockCache.class, false);
1034
1035  @Override
1036  public long heapSize() {
1037    return getCurrentSize();
1038  }
1039
1040  private static long calculateOverhead(long maxSize, long blockSize, int concurrency) {
1041    // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG
1042    return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP
1043      + ((long) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY)
1044      + ((long) concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
1045  }
1046
1047  @Override
1048  public Iterator<CachedBlock> iterator() {
1049    final Iterator<LruCachedBlock> iterator = map.values().iterator();
1050
1051    return new Iterator<CachedBlock>() {
1052      private final long now = System.nanoTime();
1053
1054      @Override
1055      public boolean hasNext() {
1056        return iterator.hasNext();
1057      }
1058
1059      @Override
1060      public CachedBlock next() {
1061        final LruCachedBlock b = iterator.next();
1062        return new CachedBlock() {
1063          @Override
1064          public String toString() {
1065            return BlockCacheUtil.toString(this, now);
1066          }
1067
1068          @Override
1069          public BlockPriority getBlockPriority() {
1070            return b.getPriority();
1071          }
1072
1073          @Override
1074          public BlockType getBlockType() {
1075            return b.getBuffer().getBlockType();
1076          }
1077
1078          @Override
1079          public long getOffset() {
1080            return b.getCacheKey().getOffset();
1081          }
1082
1083          @Override
1084          public long getSize() {
1085            return b.getBuffer().heapSize();
1086          }
1087
1088          @Override
1089          public long getCachedTime() {
1090            return b.getCachedTime();
1091          }
1092
1093          @Override
1094          public String getFilename() {
1095            return b.getCacheKey().getHfileName();
1096          }
1097
1098          @Override
1099          public int compareTo(CachedBlock other) {
1100            int diff = this.getFilename().compareTo(other.getFilename());
1101            if (diff != 0) {
1102              return diff;
1103            }
1104            diff = Long.compare(this.getOffset(), other.getOffset());
1105            if (diff != 0) {
1106              return diff;
1107            }
1108            if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
1109              throw new IllegalStateException(this.getCachedTime() + ", " + other.getCachedTime());
1110            }
1111            return Long.compare(other.getCachedTime(), this.getCachedTime());
1112          }
1113
1114          @Override
1115          public int hashCode() {
1116            return b.hashCode();
1117          }
1118
1119          @Override
1120          public boolean equals(Object obj) {
1121            if (obj instanceof CachedBlock) {
1122              CachedBlock cb = (CachedBlock) obj;
1123              return compareTo(cb) == 0;
1124            } else {
1125              return false;
1126            }
1127          }
1128        };
1129      }
1130
1131      @Override
1132      public void remove() {
1133        throw new UnsupportedOperationException();
1134      }
1135    };
1136  }
1137
1138  // Simple calculators of sizes given factors and maxSize
1139
1140  long acceptableSize() {
1141    return (long) Math.floor(this.maxSize * this.acceptableFactor);
1142  }
1143
1144  private long minSize() {
1145    return (long) Math.floor(this.maxSize * this.minFactor);
1146  }
1147
1148  private long singleSize() {
1149    return (long) Math.floor(this.maxSize * this.singleFactor * this.minFactor);
1150  }
1151
1152  private long multiSize() {
1153    return (long) Math.floor(this.maxSize * this.multiFactor * this.minFactor);
1154  }
1155
1156  private long memorySize() {
1157    return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
1158  }
1159
1160  @Override
1161  public void shutdown() {
1162    if (victimHandler != null) {
1163      victimHandler.shutdown();
1164    }
1165    this.scheduleThreadPool.shutdown();
1166    for (int i = 0; i < 10; i++) {
1167      if (!this.scheduleThreadPool.isShutdown()) {
1168        try {
1169          Thread.sleep(10);
1170        } catch (InterruptedException e) {
1171          LOG.warn("Interrupted while sleeping");
1172          Thread.currentThread().interrupt();
1173          break;
1174        }
1175      }
1176    }
1177
1178    if (!this.scheduleThreadPool.isShutdown()) {
1179      List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
1180      LOG.debug("Still running " + runnables);
1181    }
1182    this.evictionThread.shutdown();
1183  }
1184
1185  /** Clears the cache. Used in tests. */
1186  public void clearCache() {
1187    this.map.clear();
1188    this.elements.set(0);
1189  }
1190
1191  /**
1192   * Used in testing. May be very inefficient.
1193   * @return the set of cached file names
1194   */
1195  SortedSet<String> getCachedFileNamesForTest() {
1196    SortedSet<String> fileNames = new TreeSet<>();
1197    for (BlockCacheKey cacheKey : map.keySet()) {
1198      fileNames.add(cacheKey.getHfileName());
1199    }
1200    return fileNames;
1201  }
1202
1203  public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
1204    Map<DataBlockEncoding, Integer> counts = new EnumMap<>(DataBlockEncoding.class);
1205    for (LruCachedBlock block : map.values()) {
1206      DataBlockEncoding encoding = ((HFileBlock) block.getBuffer()).getDataBlockEncoding();
1207      Integer count = counts.get(encoding);
1208      counts.put(encoding, (count == null ? 0 : count) + 1);
1209    }
1210    return counts;
1211  }
1212
1213  Map<BlockCacheKey, LruCachedBlock> getMapForTests() {
1214    return map;
1215  }
1216
1217  @Override
1218  public BlockCache[] getBlockCaches() {
1219    if (victimHandler != null) {
1220      return new BlockCache[] { this, this.victimHandler };
1221    }
1222    return null;
1223  }
1224}