001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static java.util.Objects.requireNonNull;
021
022import java.lang.ref.WeakReference;
023import java.util.EnumMap;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027import java.util.PriorityQueue;
028import java.util.SortedSet;
029import java.util.TreeSet;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.Executors;
032import java.util.concurrent.ScheduledExecutorService;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicLong;
035import java.util.concurrent.atomic.LongAdder;
036import java.util.concurrent.locks.ReentrantLock;
037import org.apache.commons.lang3.mutable.MutableBoolean;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.io.HeapSize;
040import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
041import org.apache.hadoop.hbase.util.ClassSize;
042import org.apache.hadoop.util.StringUtils;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
048import org.apache.hbase.thirdparty.com.google.common.base.Objects;
049import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
050
051/**
052 * A block cache implementation that is memory-aware using {@link HeapSize}, memory-bound using an
053 * LRU eviction algorithm, and concurrent: backed by a {@link ConcurrentHashMap} and with a
054 * non-blocking eviction thread giving constant-time {@link #cacheBlock} and {@link #getBlock}
055 * operations.
056 * </p>
057 * Contains three levels of block priority to allow for scan-resistance and in-memory families
058 * {@link org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder#setInMemory(boolean)} (An
059 * in-memory column family is a column family that should be served from memory if possible):
060 * single-access, multiple-accesses, and in-memory priority. A block is added with an in-memory
061 * priority flag if {@link org.apache.hadoop.hbase.client.ColumnFamilyDescriptor#isInMemory()},
062 * otherwise a block becomes a single access priority the first time it is read into this block
063 * cache. If a block is accessed again while in cache, it is marked as a multiple access priority
064 * block. This delineation of blocks is used to prevent scans from thrashing the cache adding a
065 * least-frequently-used element to the eviction algorithm.
066 * <p/>
067 * Each priority is given its own chunk of the total cache to ensure fairness during eviction. Each
068 * priority will retain close to its maximum size, however, if any priority is not using its entire
069 * chunk the others are able to grow beyond their chunk size.
070 * <p/>
071 * Instantiated at a minimum with the total size and average block size. All sizes are in bytes. The
072 * block size is not especially important as this cache is fully dynamic in its sizing of blocks. It
073 * is only used for pre-allocating data structures and in initial heap estimation of the map.
074 * <p/>
075 * The detailed constructor defines the sizes for the three priorities (they should total to the
076 * <code>maximum size</code> defined). It also sets the levels that trigger and control the eviction
077 * thread.
078 * <p/>
079 * The <code>acceptable size</code> is the cache size level which triggers the eviction process to
080 * start. It evicts enough blocks to get the size below the minimum size specified.
081 * <p/>
082 * Eviction happens in a separate thread and involves a single full-scan of the map. It determines
083 * how many bytes must be freed to reach the minimum size, and then while scanning determines the
084 * fewest least-recently-used blocks necessary from each of the three priorities (would be 3 times
085 * bytes to free). It then uses the priority chunk sizes to evict fairly according to the relative
086 * sizes and usage.
087 */
088@InterfaceAudience.Private
089public class LruBlockCache implements FirstLevelBlockCache {
090
091  private static final Logger LOG = LoggerFactory.getLogger(LruBlockCache.class);
092
093  /**
094   * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep
095   * evicting during an eviction run till the cache size is down to 80% of the total.
096   */
097  private static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";
098
099  /**
100   * Acceptable size of cache (no evictions if size < acceptable)
101   */
102  private static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME =
103    "hbase.lru.blockcache.acceptable.factor";
104
105  /**
106   * Hard capacity limit of cache, will reject any put if size > this * acceptable
107   */
108  static final String LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME =
109    "hbase.lru.blockcache.hard.capacity.limit.factor";
110  private static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME =
111    "hbase.lru.blockcache.single.percentage";
112  private static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME =
113    "hbase.lru.blockcache.multi.percentage";
114  private static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME =
115    "hbase.lru.blockcache.memory.percentage";
116
117  /**
118   * Configuration key to force data-block always (except in-memory are too much) cached in memory
119   * for in-memory hfile, unlike inMemory, which is a column-family configuration, inMemoryForceMode
120   * is a cluster-wide configuration
121   */
122  private static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME =
123    "hbase.lru.rs.inmemoryforcemode";
124
125  /* Default Configuration Parameters */
126
127  /* Backing Concurrent Map Configuration */
128  static final float DEFAULT_LOAD_FACTOR = 0.75f;
129  static final int DEFAULT_CONCURRENCY_LEVEL = 16;
130
131  /* Eviction thresholds */
132  private static final float DEFAULT_MIN_FACTOR = 0.95f;
133  static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f;
134
135  /* Priority buckets */
136  private static final float DEFAULT_SINGLE_FACTOR = 0.25f;
137  private static final float DEFAULT_MULTI_FACTOR = 0.50f;
138  private static final float DEFAULT_MEMORY_FACTOR = 0.25f;
139
140  private static final float DEFAULT_HARD_CAPACITY_LIMIT_FACTOR = 1.2f;
141
142  private static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false;
143
144  /* Statistics thread */
145  private static final int STAT_THREAD_PERIOD = 60 * 5;
146  private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size";
147  private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
148
149  /**
150   * Defined the cache map as {@link ConcurrentHashMap} here, because in
151   * {@link LruBlockCache#getBlock}, we need to guarantee the atomicity of map#k (key, func).
152   * Besides, the func method must execute exactly once only when the key is present and under the
153   * lock context, otherwise the reference count will be messed up. Notice that the
154   * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that. Some code using
155   * #computeIfPresent also expects the supplier to be executed only once. ConcurrentHashMap can
156   * guarantee that. Other types may not.
157   */
158  private transient final ConcurrentHashMap<BlockCacheKey, LruCachedBlock> map;
159
160  /** Eviction lock (locked when eviction in process) */
161  private transient final ReentrantLock evictionLock = new ReentrantLock(true);
162
163  private final long maxBlockSize;
164
165  /** Volatile boolean to track if we are in an eviction process or not */
166  private volatile boolean evictionInProgress = false;
167
168  /** Eviction thread */
169  private transient final EvictionThread evictionThread;
170
171  /** Statistics thread schedule pool (for heavy debugging, could remove) */
172  private transient final ScheduledExecutorService scheduleThreadPool =
173    Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
174      .setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build());
175
176  /** Current size of cache */
177  private final AtomicLong size;
178
179  /** Current size of data blocks */
180  private final LongAdder dataBlockSize = new LongAdder();
181
182  /** Current size of index blocks */
183  private final LongAdder indexBlockSize = new LongAdder();
184
185  /** Current size of bloom blocks */
186  private final LongAdder bloomBlockSize = new LongAdder();
187
188  /** Current number of cached elements */
189  private final AtomicLong elements;
190
191  /** Current number of cached data block elements */
192  private final LongAdder dataBlockElements = new LongAdder();
193
194  /** Current number of cached index block elements */
195  private final LongAdder indexBlockElements = new LongAdder();
196
197  /** Current number of cached bloom block elements */
198  private final LongAdder bloomBlockElements = new LongAdder();
199
200  /** Cache access count (sequential ID) */
201  private final AtomicLong count;
202
203  /** hard capacity limit */
204  private float hardCapacityLimitFactor;
205
206  /** Cache statistics */
207  private final CacheStats stats;
208
209  /** Maximum allowable size of cache (block put if size > max, evict) */
210  private long maxSize;
211
212  /** Approximate block size */
213  private long blockSize;
214
215  /** Acceptable size of cache (no evictions if size < acceptable) */
216  private float acceptableFactor;
217
218  /** Minimum threshold of cache (when evicting, evict until size < min) */
219  private float minFactor;
220
221  /** Single access bucket size */
222  private float singleFactor;
223
224  /** Multiple access bucket size */
225  private float multiFactor;
226
227  /** In-memory bucket size */
228  private float memoryFactor;
229
230  /** Overhead of the structure itself */
231  private long overhead;
232
233  /** Whether in-memory hfile's data block has higher priority when evicting */
234  private boolean forceInMemory;
235
236  /**
237   * Where to send victims (blocks evicted/missing from the cache). This is used only when we use an
238   * external cache as L2. Note: See org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache
239   */
240  private transient BlockCache victimHandler = null;
241
242  /**
243   * Default constructor. Specify maximum size and expected average block size (approximation is
244   * fine).
245   * <p>
246   * All other factors will be calculated based on defaults specified in this class.
247   * @param maxSize   maximum size of cache, in bytes
248   * @param blockSize approximate size of each block, in bytes
249   */
250  public LruBlockCache(long maxSize, long blockSize) {
251    this(maxSize, blockSize, true);
252  }
253
254  /**
255   * Constructor used for testing. Allows disabling of the eviction thread.
256   */
257  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
258    this(maxSize, blockSize, evictionThread, (int) Math.ceil(1.2 * maxSize / blockSize),
259      DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL, DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
260      DEFAULT_SINGLE_FACTOR, DEFAULT_MULTI_FACTOR, DEFAULT_MEMORY_FACTOR,
261      DEFAULT_HARD_CAPACITY_LIMIT_FACTOR, false, DEFAULT_MAX_BLOCK_SIZE);
262  }
263
264  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
265    this(maxSize, blockSize, evictionThread, (int) Math.ceil(1.2 * maxSize / blockSize),
266      DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
267      conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
268      conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR),
269      conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR),
270      conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR),
271      conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR),
272      conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, DEFAULT_HARD_CAPACITY_LIMIT_FACTOR),
273      conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE),
274      conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE));
275  }
276
277  public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
278    this(maxSize, blockSize, true, conf);
279  }
280
281  /**
282   * Configurable constructor. Use this constructor if not using defaults.
283   * @param maxSize             maximum size of this cache, in bytes
284   * @param blockSize           expected average size of blocks, in bytes
285   * @param evictionThread      whether to run evictions in a bg thread or not
286   * @param mapInitialSize      initial size of backing ConcurrentHashMap
287   * @param mapLoadFactor       initial load factor of backing ConcurrentHashMap
288   * @param mapConcurrencyLevel initial concurrency factor for backing CHM
289   * @param minFactor           percentage of total size that eviction will evict until
290   * @param acceptableFactor    percentage of total size that triggers eviction
291   * @param singleFactor        percentage of total size for single-access blocks
292   * @param multiFactor         percentage of total size for multiple-access blocks
293   * @param memoryFactor        percentage of total size for in-memory blocks
294   */
295  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, int mapInitialSize,
296    float mapLoadFactor, int mapConcurrencyLevel, float minFactor, float acceptableFactor,
297    float singleFactor, float multiFactor, float memoryFactor, float hardLimitFactor,
298    boolean forceInMemory, long maxBlockSize) {
299    this.maxBlockSize = maxBlockSize;
300    if (
301      singleFactor + multiFactor + memoryFactor != 1 || singleFactor < 0 || multiFactor < 0
302        || memoryFactor < 0
303    ) {
304      throw new IllegalArgumentException(
305        "Single, multi, and memory factors " + " should be non-negative and total 1.0");
306    }
307    if (minFactor >= acceptableFactor) {
308      throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
309    }
310    if (minFactor >= 1.0f || acceptableFactor >= 1.0f) {
311      throw new IllegalArgumentException("all factors must be < 1");
312    }
313    this.maxSize = maxSize;
314    this.blockSize = blockSize;
315    this.forceInMemory = forceInMemory;
316    map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel);
317    this.minFactor = minFactor;
318    this.acceptableFactor = acceptableFactor;
319    this.singleFactor = singleFactor;
320    this.multiFactor = multiFactor;
321    this.memoryFactor = memoryFactor;
322    this.stats = new CacheStats(this.getClass().getSimpleName());
323    this.count = new AtomicLong(0);
324    this.elements = new AtomicLong(0);
325    this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
326    this.size = new AtomicLong(this.overhead);
327    this.hardCapacityLimitFactor = hardLimitFactor;
328    if (evictionThread) {
329      this.evictionThread = new EvictionThread(this);
330      this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
331    } else {
332      this.evictionThread = null;
333    }
334    // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log
335    // every five minutes.
336    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD,
337      STAT_THREAD_PERIOD, TimeUnit.SECONDS);
338  }
339
340  @Override
341  public void setVictimCache(BlockCache victimCache) {
342    if (victimHandler != null) {
343      throw new IllegalArgumentException("The victim cache has already been set");
344    }
345    victimHandler = requireNonNull(victimCache);
346  }
347
348  @Override
349  public void setMaxSize(long maxSize) {
350    this.maxSize = maxSize;
351    if (this.size.get() > acceptableSize() && !evictionInProgress) {
352      runEviction();
353    }
354  }
355
356  /**
357   * The block cached in LRUBlockCache will always be an heap block: on the one side, the heap
358   * access will be more faster then off-heap, the small index block or meta block cached in
359   * CombinedBlockCache will benefit a lot. on other side, the LRUBlockCache size is always
360   * calculated based on the total heap size, if caching an off-heap block in LRUBlockCache, the
361   * heap size will be messed up. Here we will clone the block into an heap block if it's an
362   * off-heap block, otherwise just use the original block. The key point is maintain the refCnt of
363   * the block (HBASE-22127): <br>
364   * 1. if cache the cloned heap block, its refCnt is an totally new one, it's easy to handle; <br>
365   * 2. if cache the original heap block, we're sure that it won't be tracked in ByteBuffAllocator's
366   * reservoir, if both RPC and LRUBlockCache release the block, then it can be garbage collected by
367   * JVM, so need a retain here.
368   * @param buf the original block
369   * @return an block with an heap memory backend.
370   */
371  private Cacheable asReferencedHeapBlock(Cacheable buf) {
372    if (buf instanceof HFileBlock) {
373      HFileBlock blk = ((HFileBlock) buf);
374      if (blk.isSharedMem()) {
375        return HFileBlock.deepCloneOnHeap(blk);
376      }
377    }
378    // The block will be referenced by this LRUBlockCache, so should increase its refCnt here.
379    return buf.retain();
380  }
381
382  // BlockCache implementation
383
384  /**
385   * Cache the block with the specified name and buffer.
386   * <p>
387   * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)
388   * this can happen, for which we compare the buffer contents.
389   * @param cacheKey block's cache key
390   * @param buf      block buffer
391   * @param inMemory if block is in-memory
392   */
393  @Override
394  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
395    if (buf.heapSize() > maxBlockSize) {
396      // If there are a lot of blocks that are too
397      // big this can make the logs way too noisy.
398      // So we log 2%
399      if (stats.failInsert() % 50 == 0) {
400        LOG.warn("Trying to cache too large a block " + cacheKey.getHfileName() + " @ "
401          + cacheKey.getOffset() + " is " + buf.heapSize() + " which is larger than "
402          + maxBlockSize);
403      }
404      return;
405    }
406
407    LruCachedBlock cb = map.get(cacheKey);
408    if (cb != null && !BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, buf)) {
409      return;
410    }
411    long currentSize = size.get();
412    long currentAcceptableSize = acceptableSize();
413    long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize);
414    if (currentSize >= hardLimitSize) {
415      stats.failInsert();
416      if (LOG.isTraceEnabled()) {
417        LOG.trace("LruBlockCache current size " + StringUtils.byteDesc(currentSize)
418          + " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + "."
419          + " The hard limit size is " + StringUtils.byteDesc(hardLimitSize)
420          + ", failed to put cacheKey:" + cacheKey + " into LruBlockCache.");
421      }
422      if (!evictionInProgress) {
423        runEviction();
424      }
425      return;
426    }
427    // Ensure that the block is an heap one.
428    buf = asReferencedHeapBlock(buf);
429    cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
430    long newSize = updateSizeMetrics(cb, false);
431    map.put(cacheKey, cb);
432    long val = elements.incrementAndGet();
433    if (buf.getBlockType().isBloom()) {
434      bloomBlockElements.increment();
435    } else if (buf.getBlockType().isIndex()) {
436      indexBlockElements.increment();
437    } else if (buf.getBlockType().isData()) {
438      dataBlockElements.increment();
439    }
440    if (LOG.isTraceEnabled()) {
441      long size = map.size();
442      assertCounterSanity(size, val);
443    }
444    if (newSize > currentAcceptableSize && !evictionInProgress) {
445      runEviction();
446    }
447  }
448
449  /**
450   * Sanity-checking for parity between actual block cache content and metrics. Intended only for
451   * use with TRACE level logging and -ea JVM.
452   */
453  private static void assertCounterSanity(long mapSize, long counterVal) {
454    if (counterVal < 0) {
455      LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal
456        + ", mapSize=" + mapSize);
457      return;
458    }
459    if (mapSize < Integer.MAX_VALUE) {
460      double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.);
461      if (pct_diff > 0.05) {
462        LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal
463          + ", mapSize=" + mapSize);
464      }
465    }
466  }
467
468  /**
469   * Cache the block with the specified name and buffer.
470   * <p>
471   * TODO after HBASE-22005, we may cache an block which allocated from off-heap, but our LRU cache
472   * sizing is based on heap size, so we should handle this in HBASE-22127. It will introduce an
473   * switch whether make the LRU on-heap or not, if so we may need copy the memory to on-heap,
474   * otherwise the caching size is based on off-heap.
475   * @param cacheKey block's cache key
476   * @param buf      block buffer
477   */
478  @Override
479  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
480    cacheBlock(cacheKey, buf, false);
481  }
482
483  /**
484   * Helper function that updates the local size counter and also updates any per-cf or
485   * per-blocktype metrics it can discern from given {@link LruCachedBlock}
486   */
487  private long updateSizeMetrics(LruCachedBlock cb, boolean evict) {
488    long heapsize = cb.heapSize();
489    BlockType bt = cb.getBuffer().getBlockType();
490    if (evict) {
491      heapsize *= -1;
492    }
493    if (bt != null) {
494      if (bt.isBloom()) {
495        bloomBlockSize.add(heapsize);
496      } else if (bt.isIndex()) {
497        indexBlockSize.add(heapsize);
498      } else if (bt.isData()) {
499        dataBlockSize.add(heapsize);
500      }
501    }
502    return size.addAndGet(heapsize);
503  }
504
505  /**
506   * Get the buffer of the block with the specified name.
507   * @param cacheKey           block's cache key
508   * @param caching            true if the caller caches blocks on cache misses
509   * @param repeat             Whether this is a repeat lookup for the same block (used to avoid
510   *                           double counting cache misses when doing double-check locking)
511   * @param updateCacheMetrics Whether to update cache metrics or not
512   * @return buffer of specified cache key, or null if not in cache
513   */
514  @Override
515  public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
516    boolean updateCacheMetrics) {
517    // Note: 'map' must be a ConcurrentHashMap or the supplier may be invoked more than once.
518    LruCachedBlock cb = map.computeIfPresent(cacheKey, (key, val) -> {
519      // It will be referenced by RPC path, so increase here. NOTICE: Must do the retain inside
520      // this block. because if retain outside the map#computeIfPresent, the evictBlock may remove
521      // the block and release, then we're retaining a block with refCnt=0 which is disallowed.
522      // see HBASE-22422.
523      val.getBuffer().retain();
524      return val;
525    });
526    if (cb == null) {
527      if (!repeat && updateCacheMetrics) {
528        stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
529      }
530      // If there is another block cache then try and read there.
531      // However if this is a retry ( second time in double checked locking )
532      // And it's already a miss then the l2 will also be a miss.
533      if (victimHandler != null && !repeat) {
534        // The handler will increase result's refCnt for RPC, so need no extra retain.
535        Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
536        // Promote this to L1.
537        if (result != null) {
538          if (caching) {
539            cacheBlock(cacheKey, result, /* inMemory = */ false);
540          }
541        }
542        return result;
543      }
544      return null;
545    }
546    if (updateCacheMetrics) {
547      stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
548    }
549    cb.access(count.incrementAndGet());
550    return cb.getBuffer();
551  }
552
553  /**
554   * Whether the cache contains block with specified cacheKey
555   * @return true if contains the block
556   */
557  @Override
558  public boolean containsBlock(BlockCacheKey cacheKey) {
559    return map.containsKey(cacheKey);
560  }
561
562  @Override
563  public boolean evictBlock(BlockCacheKey cacheKey) {
564    LruCachedBlock cb = map.get(cacheKey);
565    return cb != null && evictBlock(cb, false) > 0;
566  }
567
568  /**
569   * Evicts all blocks for a specific HFile. This is an expensive operation implemented as a
570   * linear-time search through all blocks in the cache. Ideally this should be a search in a
571   * log-access-time map.
572   * <p>
573   * This is used for evict-on-close to remove all blocks of a specific HFile.
574   * @return the number of blocks evicted
575   */
576  @Override
577  public int evictBlocksByHfileName(String hfileName) {
578    int numEvicted = 0;
579    for (BlockCacheKey key : map.keySet()) {
580      if (key.getHfileName().equals(hfileName)) {
581        if (evictBlock(key)) {
582          ++numEvicted;
583        }
584      }
585    }
586    if (victimHandler != null) {
587      numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
588    }
589    return numEvicted;
590  }
591
592  /**
593   * Evict the block, and it will be cached by the victim handler if exists &amp;&amp; block may be
594   * read again later
595   * @param evictedByEvictionProcess true if the given block is evicted by EvictionThread
596   * @return the heap size of evicted block
597   */
598  protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) {
599    final MutableBoolean evicted = new MutableBoolean(false);
600    // Note: 'map' must be a ConcurrentHashMap or the supplier may be invoked more than once.
601    map.computeIfPresent(block.getCacheKey(), (k, v) -> {
602      // Run the victim handler before we remove the mapping in the L1 map. It must complete
603      // quickly because other removal or insertion operations can be blocked in the meantime.
604      if (evictedByEvictionProcess && victimHandler != null) {
605        victimHandler.cacheBlock(k, v.getBuffer());
606      }
607      // Decrease the block's reference count, and if refCount is 0, then it'll auto-deallocate. DO
608      // NOT move this up because if we do that then the victimHandler may access the buffer with
609      // refCnt = 0 which is disallowed.
610      v.getBuffer().release();
611      evicted.setTrue();
612      // By returning null from the supplier we remove the mapping from the L1 map.
613      return null;
614    });
615    // If we didn't find anything to evict there is nothing more to do here.
616    if (evicted.isFalse()) {
617      return 0;
618    }
619    // We evicted the block so update L1 statistics.
620    updateSizeMetrics(block, true);
621    long val = elements.decrementAndGet();
622    if (LOG.isTraceEnabled()) {
623      long size = map.size();
624      assertCounterSanity(size, val);
625    }
626    BlockType bt = block.getBuffer().getBlockType();
627    if (bt.isBloom()) {
628      bloomBlockElements.decrement();
629    } else if (bt.isIndex()) {
630      indexBlockElements.decrement();
631    } else if (bt.isData()) {
632      dataBlockElements.decrement();
633    }
634    if (evictedByEvictionProcess) {
635      // When the eviction of the block happened because of invalidation of HFiles, no need to
636      // update the stats counter.
637      stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary());
638    }
639    return block.heapSize();
640  }
641
642  /**
643   * Multi-threaded call to run the eviction process.
644   */
645  private void runEviction() {
646    if (evictionThread == null || !evictionThread.isGo()) {
647      evict();
648    } else {
649      evictionThread.evict();
650    }
651  }
652
653  boolean isEvictionInProgress() {
654    return evictionInProgress;
655  }
656
657  long getOverhead() {
658    return overhead;
659  }
660
661  /**
662   * Eviction method.
663   */
664  void evict() {
665
666    // Ensure only one eviction at a time
667    if (!evictionLock.tryLock()) {
668      return;
669    }
670
671    try {
672      evictionInProgress = true;
673      long currentSize = this.size.get();
674      long bytesToFree = currentSize - minSize();
675
676      if (LOG.isTraceEnabled()) {
677        LOG.trace("Block cache LRU eviction started; Attempting to free "
678          + StringUtils.byteDesc(bytesToFree) + " of total=" + StringUtils.byteDesc(currentSize));
679      }
680
681      if (bytesToFree <= 0) {
682        return;
683      }
684
685      // Instantiate priority buckets
686      BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize, singleSize());
687      BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize, multiSize());
688      BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize, memorySize());
689
690      // Scan entire map putting into appropriate buckets
691      for (LruCachedBlock cachedBlock : map.values()) {
692        switch (cachedBlock.getPriority()) {
693          case SINGLE: {
694            bucketSingle.add(cachedBlock);
695            break;
696          }
697          case MULTI: {
698            bucketMulti.add(cachedBlock);
699            break;
700          }
701          case MEMORY: {
702            bucketMemory.add(cachedBlock);
703            break;
704          }
705        }
706      }
707
708      long bytesFreed = 0;
709      if (forceInMemory || memoryFactor > 0.999f) {
710        long s = bucketSingle.totalSize();
711        long m = bucketMulti.totalSize();
712        if (bytesToFree > (s + m)) {
713          // this means we need to evict blocks in memory bucket to make room,
714          // so the single and multi buckets will be emptied
715          bytesFreed = bucketSingle.free(s);
716          bytesFreed += bucketMulti.free(m);
717          if (LOG.isTraceEnabled()) {
718            LOG.trace(
719              "freed " + StringUtils.byteDesc(bytesFreed) + " from single and multi buckets");
720          }
721          bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
722          if (LOG.isTraceEnabled()) {
723            LOG.trace(
724              "freed " + StringUtils.byteDesc(bytesFreed) + " total from all three buckets ");
725          }
726        } else {
727          // this means no need to evict block in memory bucket,
728          // and we try best to make the ratio between single-bucket and
729          // multi-bucket is 1:2
730          long bytesRemain = s + m - bytesToFree;
731          if (3 * s <= bytesRemain) {
732            // single-bucket is small enough that no eviction happens for it
733            // hence all eviction goes from multi-bucket
734            bytesFreed = bucketMulti.free(bytesToFree);
735          } else if (3 * m <= 2 * bytesRemain) {
736            // multi-bucket is small enough that no eviction happens for it
737            // hence all eviction goes from single-bucket
738            bytesFreed = bucketSingle.free(bytesToFree);
739          } else {
740            // both buckets need to evict some blocks
741            bytesFreed = bucketSingle.free(s - bytesRemain / 3);
742            if (bytesFreed < bytesToFree) {
743              bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
744            }
745          }
746        }
747      } else {
748        PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3);
749
750        bucketQueue.add(bucketSingle);
751        bucketQueue.add(bucketMulti);
752        bucketQueue.add(bucketMemory);
753
754        int remainingBuckets = bucketQueue.size();
755
756        BlockBucket bucket;
757        while ((bucket = bucketQueue.poll()) != null) {
758          long overflow = bucket.overflow();
759          if (overflow > 0) {
760            long bucketBytesToFree =
761              Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets);
762            bytesFreed += bucket.free(bucketBytesToFree);
763          }
764          remainingBuckets--;
765        }
766      }
767      if (LOG.isTraceEnabled()) {
768        long single = bucketSingle.totalSize();
769        long multi = bucketMulti.totalSize();
770        long memory = bucketMemory.totalSize();
771        LOG.trace(
772          "Block cache LRU eviction completed; " + "freed=" + StringUtils.byteDesc(bytesFreed)
773            + ", " + "total=" + StringUtils.byteDesc(this.size.get()) + ", " + "single="
774            + StringUtils.byteDesc(single) + ", " + "multi=" + StringUtils.byteDesc(multi) + ", "
775            + "memory=" + StringUtils.byteDesc(memory));
776      }
777    } finally {
778      stats.evict();
779      evictionInProgress = false;
780      evictionLock.unlock();
781    }
782  }
783
784  @Override
785  public String toString() {
786    return MoreObjects.toStringHelper(this).add("blockCount", getBlockCount())
787      .add("currentSize", StringUtils.byteDesc(getCurrentSize()))
788      .add("freeSize", StringUtils.byteDesc(getFreeSize()))
789      .add("maxSize", StringUtils.byteDesc(getMaxSize()))
790      .add("heapSize", StringUtils.byteDesc(heapSize()))
791      .add("minSize", StringUtils.byteDesc(minSize())).add("minFactor", minFactor)
792      .add("multiSize", StringUtils.byteDesc(multiSize())).add("multiFactor", multiFactor)
793      .add("singleSize", StringUtils.byteDesc(singleSize())).add("singleFactor", singleFactor)
794      .toString();
795  }
796
797  /**
798   * Used to group blocks into priority buckets. There will be a BlockBucket for each priority
799   * (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate number of
800   * elements out of each according to configuration parameters and their relatives sizes.
801   */
802  private class BlockBucket implements Comparable<BlockBucket> {
803
804    private final String name;
805    private LruCachedBlockQueue queue;
806    private long totalSize = 0;
807    private long bucketSize;
808
809    public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) {
810      this.name = name;
811      this.bucketSize = bucketSize;
812      queue = new LruCachedBlockQueue(bytesToFree, blockSize);
813      totalSize = 0;
814    }
815
816    public void add(LruCachedBlock block) {
817      totalSize += block.heapSize();
818      queue.add(block);
819    }
820
821    public long free(long toFree) {
822      if (LOG.isTraceEnabled()) {
823        LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this);
824      }
825      LruCachedBlock cb;
826      long freedBytes = 0;
827      while ((cb = queue.pollLast()) != null) {
828        freedBytes += evictBlock(cb, true);
829        if (freedBytes >= toFree) {
830          return freedBytes;
831        }
832      }
833      if (LOG.isTraceEnabled()) {
834        LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this);
835      }
836      return freedBytes;
837    }
838
839    public long overflow() {
840      return totalSize - bucketSize;
841    }
842
843    public long totalSize() {
844      return totalSize;
845    }
846
847    @Override
848    public int compareTo(BlockBucket that) {
849      return Long.compare(this.overflow(), that.overflow());
850    }
851
852    @Override
853    public boolean equals(Object that) {
854      if (that == null || !(that instanceof BlockBucket)) {
855        return false;
856      }
857      return compareTo((BlockBucket) that) == 0;
858    }
859
860    @Override
861    public int hashCode() {
862      return Objects.hashCode(name, bucketSize, queue, totalSize);
863    }
864
865    @Override
866    public String toString() {
867      return MoreObjects.toStringHelper(this).add("name", name)
868        .add("totalSize", StringUtils.byteDesc(totalSize))
869        .add("bucketSize", StringUtils.byteDesc(bucketSize)).toString();
870    }
871  }
872
873  /**
874   * Get the maximum size of this cache.
875   * @return max size in bytes
876   */
877
878  @Override
879  public long getMaxSize() {
880    return this.maxSize;
881  }
882
883  @Override
884  public long getCurrentSize() {
885    return this.size.get();
886  }
887
888  @Override
889  public long getCurrentDataSize() {
890    return this.dataBlockSize.sum();
891  }
892
893  public long getCurrentIndexSize() {
894    return this.indexBlockSize.sum();
895  }
896
897  public long getCurrentBloomSize() {
898    return this.bloomBlockSize.sum();
899  }
900
901  @Override
902  public long getFreeSize() {
903    return getMaxSize() - getCurrentSize();
904  }
905
906  @Override
907  public long size() {
908    return getMaxSize();
909  }
910
911  @Override
912  public long getBlockCount() {
913    return this.elements.get();
914  }
915
916  @Override
917  public long getDataBlockCount() {
918    return this.dataBlockElements.sum();
919  }
920
921  public long getIndexBlockCount() {
922    return this.indexBlockElements.sum();
923  }
924
925  public long getBloomBlockCount() {
926    return this.bloomBlockElements.sum();
927  }
928
929  EvictionThread getEvictionThread() {
930    return this.evictionThread;
931  }
932
933  /*
934   * Eviction thread. Sits in waiting state until an eviction is triggered when the cache size grows
935   * above the acceptable level.<p> Thread is triggered into action by {@link
936   * LruBlockCache#runEviction()}
937   */
938  static class EvictionThread extends Thread {
939
940    private WeakReference<LruBlockCache> cache;
941    private volatile boolean go = true;
942    // flag set after enter the run method, used for test
943    private boolean enteringRun = false;
944
945    public EvictionThread(LruBlockCache cache) {
946      super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread");
947      setDaemon(true);
948      this.cache = new WeakReference<>(cache);
949    }
950
951    @Override
952    public void run() {
953      enteringRun = true;
954      while (this.go) {
955        synchronized (this) {
956          try {
957            this.wait(1000 * 10/* Don't wait for ever */);
958          } catch (InterruptedException e) {
959            LOG.warn("Interrupted eviction thread ", e);
960            Thread.currentThread().interrupt();
961          }
962        }
963        LruBlockCache cache = this.cache.get();
964        if (cache == null) {
965          this.go = false;
966          break;
967        }
968        cache.evict();
969      }
970    }
971
972    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY",
973        justification = "This is what we want")
974    public void evict() {
975      synchronized (this) {
976        this.notifyAll();
977      }
978    }
979
980    synchronized void shutdown() {
981      this.go = false;
982      this.notifyAll();
983    }
984
985    public boolean isGo() {
986      return go;
987    }
988
989    /**
990     * Used for the test.
991     */
992    boolean isEnteringRun() {
993      return this.enteringRun;
994    }
995  }
996
997  /*
998   * Statistics thread. Periodically prints the cache statistics to the log.
999   */
1000  static class StatisticsThread extends Thread {
1001
1002    private final LruBlockCache lru;
1003
1004    public StatisticsThread(LruBlockCache lru) {
1005      super("LruBlockCacheStats");
1006      setDaemon(true);
1007      this.lru = lru;
1008    }
1009
1010    @Override
1011    public void run() {
1012      lru.logStats();
1013    }
1014  }
1015
1016  public void logStats() {
1017    // Log size
1018    long usedSize = heapSize();
1019    long freeSize = maxSize - usedSize;
1020    LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(maxSize) + ", " + "usedSize="
1021      + StringUtils.byteDesc(usedSize) + ", " + "freeSize=" + StringUtils.byteDesc(freeSize) + ", "
1022      + "max=" + StringUtils.byteDesc(this.maxSize) + ", " + "blockCount=" + getBlockCount() + ", "
1023      + "accesses=" + stats.getRequestCount() + ", " + "hits=" + stats.getHitCount() + ", "
1024      + "hitRatio="
1025      + (stats.getHitCount() == 0
1026        ? "0"
1027        : (StringUtils.formatPercent(stats.getHitRatio(), 2) + ", "))
1028      + ", " + "cachingAccesses=" + stats.getRequestCachingCount() + ", " + "cachingHits="
1029      + stats.getHitCachingCount() + ", " + "cachingHitsRatio="
1030      + (stats.getHitCachingCount() == 0
1031        ? "0,"
1032        : (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", "))
1033      + "evictions=" + stats.getEvictionCount() + ", " + "evicted=" + stats.getEvictedCount() + ", "
1034      + "evictedPerRun=" + stats.evictedPerEviction());
1035  }
1036
1037  /**
1038   * Get counter statistics for this cache.
1039   * <p>
1040   * Includes: total accesses, hits, misses, evicted blocks, and runs of the eviction processes.
1041   */
1042  @Override
1043  public CacheStats getStats() {
1044    return this.stats;
1045  }
1046
1047  public final static long CACHE_FIXED_OVERHEAD =
1048    ClassSize.estimateBase(LruBlockCache.class, false);
1049
1050  @Override
1051  public long heapSize() {
1052    return getCurrentSize();
1053  }
1054
1055  private static long calculateOverhead(long maxSize, long blockSize, int concurrency) {
1056    // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG
1057    return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP
1058      + ((long) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY)
1059      + ((long) concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
1060  }
1061
1062  @Override
1063  public Iterator<CachedBlock> iterator() {
1064    final Iterator<LruCachedBlock> iterator = map.values().iterator();
1065
1066    return new Iterator<CachedBlock>() {
1067      private final long now = System.nanoTime();
1068
1069      @Override
1070      public boolean hasNext() {
1071        return iterator.hasNext();
1072      }
1073
1074      @Override
1075      public CachedBlock next() {
1076        final LruCachedBlock b = iterator.next();
1077        return new CachedBlock() {
1078          @Override
1079          public String toString() {
1080            return BlockCacheUtil.toString(this, now);
1081          }
1082
1083          @Override
1084          public BlockPriority getBlockPriority() {
1085            return b.getPriority();
1086          }
1087
1088          @Override
1089          public BlockType getBlockType() {
1090            return b.getBuffer().getBlockType();
1091          }
1092
1093          @Override
1094          public long getOffset() {
1095            return b.getCacheKey().getOffset();
1096          }
1097
1098          @Override
1099          public long getSize() {
1100            return b.getBuffer().heapSize();
1101          }
1102
1103          @Override
1104          public long getCachedTime() {
1105            return b.getCachedTime();
1106          }
1107
1108          @Override
1109          public String getFilename() {
1110            return b.getCacheKey().getHfileName();
1111          }
1112
1113          @Override
1114          public int compareTo(CachedBlock other) {
1115            int diff = this.getFilename().compareTo(other.getFilename());
1116            if (diff != 0) {
1117              return diff;
1118            }
1119            diff = Long.compare(this.getOffset(), other.getOffset());
1120            if (diff != 0) {
1121              return diff;
1122            }
1123            if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
1124              throw new IllegalStateException(this.getCachedTime() + ", " + other.getCachedTime());
1125            }
1126            return Long.compare(other.getCachedTime(), this.getCachedTime());
1127          }
1128
1129          @Override
1130          public int hashCode() {
1131            return b.hashCode();
1132          }
1133
1134          @Override
1135          public boolean equals(Object obj) {
1136            if (obj instanceof CachedBlock) {
1137              CachedBlock cb = (CachedBlock) obj;
1138              return compareTo(cb) == 0;
1139            } else {
1140              return false;
1141            }
1142          }
1143        };
1144      }
1145
1146      @Override
1147      public void remove() {
1148        throw new UnsupportedOperationException();
1149      }
1150    };
1151  }
1152
1153  // Simple calculators of sizes given factors and maxSize
1154
1155  long acceptableSize() {
1156    return (long) Math.floor(this.maxSize * this.acceptableFactor);
1157  }
1158
1159  private long minSize() {
1160    return (long) Math.floor(this.maxSize * this.minFactor);
1161  }
1162
1163  private long singleSize() {
1164    return (long) Math.floor(this.maxSize * this.singleFactor * this.minFactor);
1165  }
1166
1167  private long multiSize() {
1168    return (long) Math.floor(this.maxSize * this.multiFactor * this.minFactor);
1169  }
1170
1171  private long memorySize() {
1172    return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
1173  }
1174
1175  @Override
1176  public void shutdown() {
1177    if (victimHandler != null) {
1178      victimHandler.shutdown();
1179    }
1180    this.scheduleThreadPool.shutdown();
1181    for (int i = 0; i < 10; i++) {
1182      if (!this.scheduleThreadPool.isShutdown()) {
1183        try {
1184          Thread.sleep(10);
1185        } catch (InterruptedException e) {
1186          LOG.warn("Interrupted while sleeping");
1187          Thread.currentThread().interrupt();
1188          break;
1189        }
1190      }
1191    }
1192
1193    if (!this.scheduleThreadPool.isShutdown()) {
1194      List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
1195      LOG.debug("Still running " + runnables);
1196    }
1197    this.evictionThread.shutdown();
1198  }
1199
1200  /** Clears the cache. Used in tests. */
1201  public void clearCache() {
1202    this.map.clear();
1203    this.elements.set(0);
1204  }
1205
1206  /**
1207   * Used in testing. May be very inefficient.
1208   * @return the set of cached file names
1209   */
1210  SortedSet<String> getCachedFileNamesForTest() {
1211    SortedSet<String> fileNames = new TreeSet<>();
1212    for (BlockCacheKey cacheKey : map.keySet()) {
1213      fileNames.add(cacheKey.getHfileName());
1214    }
1215    return fileNames;
1216  }
1217
1218  public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
1219    Map<DataBlockEncoding, Integer> counts = new EnumMap<>(DataBlockEncoding.class);
1220    for (LruCachedBlock block : map.values()) {
1221      DataBlockEncoding encoding = ((HFileBlock) block.getBuffer()).getDataBlockEncoding();
1222      Integer count = counts.get(encoding);
1223      counts.put(encoding, (count == null ? 0 : count) + 1);
1224    }
1225    return counts;
1226  }
1227
1228  Map<BlockCacheKey, LruCachedBlock> getMapForTests() {
1229    return map;
1230  }
1231
1232  @Override
1233  public BlockCache[] getBlockCaches() {
1234    if (victimHandler != null) {
1235      return new BlockCache[] { this, this.victimHandler };
1236    }
1237    return null;
1238  }
1239}