001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static java.util.Objects.requireNonNull;
021
022import java.lang.ref.WeakReference;
023import java.util.EnumMap;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027import java.util.PriorityQueue;
028import java.util.SortedSet;
029import java.util.TreeSet;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.Executors;
032import java.util.concurrent.ScheduledExecutorService;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicLong;
035import java.util.concurrent.atomic.LongAdder;
036import java.util.concurrent.locks.ReentrantLock;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.io.HeapSize;
039import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
040import org.apache.hadoop.hbase.util.ClassSize;
041import org.apache.hadoop.util.StringUtils;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
047import org.apache.hbase.thirdparty.com.google.common.base.Objects;
048import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
049
050/**
051 * A block cache implementation that is memory-aware using {@link HeapSize},
052 * memory-bound using an LRU eviction algorithm, and concurrent: backed by a
053 * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving
054 * constant-time {@link #cacheBlock} and {@link #getBlock} operations.<p>
055 *
056 * Contains three levels of block priority to allow for scan-resistance and in-memory families
057 * {@link org.apache.hadoop.hbase.HColumnDescriptor#setInMemory(boolean)} (An in-memory column
058 * family is a column family that should be served from memory if possible):
059 * single-access, multiple-accesses, and in-memory priority.
060 * A block is added with an in-memory priority flag if
061 * {@link org.apache.hadoop.hbase.HColumnDescriptor#isInMemory()}, otherwise a block becomes a
062 * single access priority the first time it is read into this block cache.  If a block is
063 * accessed again while in cache, it is marked as a multiple access priority block.  This
064 * delineation of blocks is used to prevent scans from thrashing the cache adding a
065 * least-frequently-used element to the eviction algorithm.<p>
066 *
067 * Each priority is given its own chunk of the total cache to ensure
068 * fairness during eviction.  Each priority will retain close to its maximum
069 * size, however, if any priority is not using its entire chunk the others
070 * are able to grow beyond their chunk size.<p>
071 *
072 * Instantiated at a minimum with the total size and average block size.
073 * All sizes are in bytes.  The block size is not especially important as this
074 * cache is fully dynamic in its sizing of blocks.  It is only used for
075 * pre-allocating data structures and in initial heap estimation of the map.<p>
076 *
077 * The detailed constructor defines the sizes for the three priorities (they
078 * should total to the <code>maximum size</code> defined).  It also sets the levels that
079 * trigger and control the eviction thread.<p>
080 *
081 * The <code>acceptable size</code> is the cache size level which triggers the eviction
082 * process to start.  It evicts enough blocks to get the size below the
083 * minimum size specified.<p>
084 *
085 * Eviction happens in a separate thread and involves a single full-scan
086 * of the map.  It determines how many bytes must be freed to reach the minimum
087 * size, and then while scanning determines the fewest least-recently-used
088 * blocks necessary from each of the three priorities (would be 3 times bytes
089 * to free).  It then uses the priority chunk sizes to evict fairly according
090 * to the relative sizes and usage.
091 */
092@InterfaceAudience.Private
093public class LruBlockCache implements FirstLevelBlockCache {
094
095  private static final Logger LOG = LoggerFactory.getLogger(LruBlockCache.class);
096
097  /**
098   * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep
099   * evicting during an eviction run till the cache size is down to 80% of the total.
100   */
101  private static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";
102
103  /**
104   * Acceptable size of cache (no evictions if size < acceptable)
105   */
106  private static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME =
107      "hbase.lru.blockcache.acceptable.factor";
108
109  /**
110   * Hard capacity limit of cache, will reject any put if size > this * acceptable
111   */
112  static final String LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME =
113      "hbase.lru.blockcache.hard.capacity.limit.factor";
114  private static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME =
115      "hbase.lru.blockcache.single.percentage";
116  private static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME =
117      "hbase.lru.blockcache.multi.percentage";
118  private static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME =
119      "hbase.lru.blockcache.memory.percentage";
120
121  /**
122   * Configuration key to force data-block always (except in-memory are too much)
123   * cached in memory for in-memory hfile, unlike inMemory, which is a column-family
124   * configuration, inMemoryForceMode is a cluster-wide configuration
125   */
126  private static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME =
127      "hbase.lru.rs.inmemoryforcemode";
128
129  /* Default Configuration Parameters*/
130
131  /* Backing Concurrent Map Configuration */
132  static final float DEFAULT_LOAD_FACTOR = 0.75f;
133  static final int DEFAULT_CONCURRENCY_LEVEL = 16;
134
135  /* Eviction thresholds */
136  private static final float DEFAULT_MIN_FACTOR = 0.95f;
137  static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f;
138
139  /* Priority buckets */
140  private static final float DEFAULT_SINGLE_FACTOR = 0.25f;
141  private static final float DEFAULT_MULTI_FACTOR = 0.50f;
142  private static final float DEFAULT_MEMORY_FACTOR = 0.25f;
143
144  private static final float DEFAULT_HARD_CAPACITY_LIMIT_FACTOR = 1.2f;
145
146  private static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false;
147
148  /* Statistics thread */
149  private static final int STAT_THREAD_PERIOD = 60 * 5;
150  private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size";
151  private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
152
153  /**
154   * Defined the cache map as {@link ConcurrentHashMap} here, because in
155   * {@link LruBlockCache#getBlock}, we need to guarantee the atomicity of map#computeIfPresent
156   * (key, func). Besides, the func method must execute exactly once only when the key is present
157   * and under the lock context, otherwise the reference count will be messed up. Notice that the
158   * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that.
159   */
160  private transient final ConcurrentHashMap<BlockCacheKey, LruCachedBlock> map;
161
162  /** Eviction lock (locked when eviction in process) */
163  private transient final ReentrantLock evictionLock = new ReentrantLock(true);
164
165  private final long maxBlockSize;
166
167  /** Volatile boolean to track if we are in an eviction process or not */
168  private volatile boolean evictionInProgress = false;
169
170  /** Eviction thread */
171  private transient final EvictionThread evictionThread;
172
173  /** Statistics thread schedule pool (for heavy debugging, could remove) */
174  private transient final ScheduledExecutorService scheduleThreadPool =
175    Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
176      .setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build());
177
178  /** Current size of cache */
179  private final AtomicLong size;
180
181  /** Current size of data blocks */
182  private final LongAdder dataBlockSize;
183
184  /** Current number of cached elements */
185  private final AtomicLong elements;
186
187  /** Current number of cached data block elements */
188  private final LongAdder dataBlockElements;
189
190  /** Cache access count (sequential ID) */
191  private final AtomicLong count;
192
193  /** hard capacity limit */
194  private float hardCapacityLimitFactor;
195
196  /** Cache statistics */
197  private final CacheStats stats;
198
199  /** Maximum allowable size of cache (block put if size > max, evict) */
200  private long maxSize;
201
202  /** Approximate block size */
203  private long blockSize;
204
205  /** Acceptable size of cache (no evictions if size < acceptable) */
206  private float acceptableFactor;
207
208  /** Minimum threshold of cache (when evicting, evict until size < min) */
209  private float minFactor;
210
211  /** Single access bucket size */
212  private float singleFactor;
213
214  /** Multiple access bucket size */
215  private float multiFactor;
216
217  /** In-memory bucket size */
218  private float memoryFactor;
219
220  /** Overhead of the structure itself */
221  private long overhead;
222
223  /** Whether in-memory hfile's data block has higher priority when evicting */
224  private boolean forceInMemory;
225
226  /**
227   * Where to send victims (blocks evicted/missing from the cache). This is used only when we use an
228   * external cache as L2.
229   * Note: See org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache
230   */
231  private transient BlockCache victimHandler = null;
232
233  /**
234   * Default constructor.  Specify maximum size and expected average block
235   * size (approximation is fine).
236   *
237   * <p>All other factors will be calculated based on defaults specified in
238   * this class.
239   *
240   * @param maxSize   maximum size of cache, in bytes
241   * @param blockSize approximate size of each block, in bytes
242   */
243  public LruBlockCache(long maxSize, long blockSize) {
244    this(maxSize, blockSize, true);
245  }
246
247  /**
248   * Constructor used for testing.  Allows disabling of the eviction thread.
249   */
250  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
251    this(maxSize, blockSize, evictionThread,
252        (int) Math.ceil(1.2 * maxSize / blockSize),
253        DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
254        DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
255        DEFAULT_SINGLE_FACTOR,
256        DEFAULT_MULTI_FACTOR,
257        DEFAULT_MEMORY_FACTOR,
258        DEFAULT_HARD_CAPACITY_LIMIT_FACTOR,
259        false,
260        DEFAULT_MAX_BLOCK_SIZE);
261  }
262
263  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
264    this(maxSize, blockSize, evictionThread,
265        (int) Math.ceil(1.2 * maxSize / blockSize),
266        DEFAULT_LOAD_FACTOR,
267        DEFAULT_CONCURRENCY_LEVEL,
268        conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
269        conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR),
270        conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR),
271        conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR),
272        conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR),
273        conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME,
274                      DEFAULT_HARD_CAPACITY_LIMIT_FACTOR),
275        conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE),
276        conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE));
277  }
278
279  public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
280    this(maxSize, blockSize, true, conf);
281  }
282
283  /**
284   * Configurable constructor.  Use this constructor if not using defaults.
285   *
286   * @param maxSize             maximum size of this cache, in bytes
287   * @param blockSize           expected average size of blocks, in bytes
288   * @param evictionThread      whether to run evictions in a bg thread or not
289   * @param mapInitialSize      initial size of backing ConcurrentHashMap
290   * @param mapLoadFactor       initial load factor of backing ConcurrentHashMap
291   * @param mapConcurrencyLevel initial concurrency factor for backing CHM
292   * @param minFactor           percentage of total size that eviction will evict until
293   * @param acceptableFactor    percentage of total size that triggers eviction
294   * @param singleFactor        percentage of total size for single-access blocks
295   * @param multiFactor         percentage of total size for multiple-access blocks
296   * @param memoryFactor        percentage of total size for in-memory blocks
297   */
298  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
299      int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
300      float minFactor, float acceptableFactor, float singleFactor,
301      float multiFactor, float memoryFactor, float hardLimitFactor,
302      boolean forceInMemory, long maxBlockSize) {
303    this.maxBlockSize = maxBlockSize;
304    if(singleFactor + multiFactor + memoryFactor != 1 ||
305        singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) {
306      throw new IllegalArgumentException("Single, multi, and memory factors " +
307          " should be non-negative and total 1.0");
308    }
309    if (minFactor >= acceptableFactor) {
310      throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
311    }
312    if (minFactor >= 1.0f || acceptableFactor >= 1.0f) {
313      throw new IllegalArgumentException("all factors must be < 1");
314    }
315    this.maxSize = maxSize;
316    this.blockSize = blockSize;
317    this.forceInMemory = forceInMemory;
318    map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel);
319    this.minFactor = minFactor;
320    this.acceptableFactor = acceptableFactor;
321    this.singleFactor = singleFactor;
322    this.multiFactor = multiFactor;
323    this.memoryFactor = memoryFactor;
324    this.stats = new CacheStats(this.getClass().getSimpleName());
325    this.count = new AtomicLong(0);
326    this.elements = new AtomicLong(0);
327    this.dataBlockElements = new LongAdder();
328    this.dataBlockSize = new LongAdder();
329    this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
330    this.size = new AtomicLong(this.overhead);
331    this.hardCapacityLimitFactor = hardLimitFactor;
332    if (evictionThread) {
333      this.evictionThread = new EvictionThread(this);
334      this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
335    } else {
336      this.evictionThread = null;
337    }
338    // TODO: Add means of turning this off.  Bit obnoxious running thread just to make a log
339    // every five minutes.
340    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD,
341                                                STAT_THREAD_PERIOD, TimeUnit.SECONDS);
342  }
343
344  @Override
345  public void setVictimCache(BlockCache victimCache) {
346    if (victimHandler != null) {
347      throw new IllegalArgumentException("The victim cache has already been set");
348    }
349    victimHandler = requireNonNull(victimCache);
350  }
351
352  @Override
353  public void setMaxSize(long maxSize) {
354    this.maxSize = maxSize;
355    if (this.size.get() > acceptableSize() && !evictionInProgress) {
356      runEviction();
357    }
358  }
359
360  /**
361   * The block cached in LRUBlockCache will always be an heap block: on the one side, the heap
362   * access will be more faster then off-heap, the small index block or meta block cached in
363   * CombinedBlockCache will benefit a lot. on other side, the LRUBlockCache size is always
364   * calculated based on the total heap size, if caching an off-heap block in LRUBlockCache, the
365   * heap size will be messed up. Here we will clone the block into an heap block if it's an
366   * off-heap block, otherwise just use the original block. The key point is maintain the refCnt of
367   * the block (HBASE-22127): <br>
368   * 1. if cache the cloned heap block, its refCnt is an totally new one, it's easy to handle; <br>
369   * 2. if cache the original heap block, we're sure that it won't be tracked in ByteBuffAllocator's
370   * reservoir, if both RPC and LRUBlockCache release the block, then it can be garbage collected by
371   * JVM, so need a retain here.
372   * @param buf the original block
373   * @return an block with an heap memory backend.
374   */
375  private Cacheable asReferencedHeapBlock(Cacheable buf) {
376    if (buf instanceof HFileBlock) {
377      HFileBlock blk = ((HFileBlock) buf);
378      if (blk.isSharedMem()) {
379        return HFileBlock.deepCloneOnHeap(blk);
380      }
381    }
382    // The block will be referenced by this LRUBlockCache, so should increase its refCnt here.
383    return buf.retain();
384  }
385
386  // BlockCache implementation
387
388  /**
389   * Cache the block with the specified name and buffer.
390   * <p>
391   * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)
392   * this can happen, for which we compare the buffer contents.
393   *
394   * @param cacheKey block's cache key
395   * @param buf      block buffer
396   * @param inMemory if block is in-memory
397   */
398  @Override
399  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
400    if (buf.heapSize() > maxBlockSize) {
401      // If there are a lot of blocks that are too
402      // big this can make the logs way too noisy.
403      // So we log 2%
404      if (stats.failInsert() % 50 == 0) {
405        LOG.warn("Trying to cache too large a block "
406            + cacheKey.getHfileName() + " @ "
407            + cacheKey.getOffset()
408            + " is " + buf.heapSize()
409            + " which is larger than " + maxBlockSize);
410      }
411      return;
412    }
413
414    LruCachedBlock cb = map.get(cacheKey);
415    if (cb != null && !BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, buf)) {
416      return;
417    }
418    long currentSize = size.get();
419    long currentAcceptableSize = acceptableSize();
420    long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize);
421    if (currentSize >= hardLimitSize) {
422      stats.failInsert();
423      if (LOG.isTraceEnabled()) {
424        LOG.trace("LruBlockCache current size " + StringUtils.byteDesc(currentSize)
425          + " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + "."
426          + " The hard limit size is " + StringUtils.byteDesc(hardLimitSize)
427          + ", failed to put cacheKey:" + cacheKey + " into LruBlockCache.");
428      }
429      if (!evictionInProgress) {
430        runEviction();
431      }
432      return;
433    }
434    // Ensure that the block is an heap one.
435    buf = asReferencedHeapBlock(buf);
436    cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
437    long newSize = updateSizeMetrics(cb, false);
438    map.put(cacheKey, cb);
439    long val = elements.incrementAndGet();
440    if (buf.getBlockType().isData()) {
441      dataBlockElements.increment();
442    }
443    if (LOG.isTraceEnabled()) {
444      long size = map.size();
445      assertCounterSanity(size, val);
446    }
447    if (newSize > currentAcceptableSize && !evictionInProgress) {
448      runEviction();
449    }
450  }
451
452  /**
453   * Sanity-checking for parity between actual block cache content and metrics.
454   * Intended only for use with TRACE level logging and -ea JVM.
455   */
456  private static void assertCounterSanity(long mapSize, long counterVal) {
457    if (counterVal < 0) {
458      LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal +
459        ", mapSize=" + mapSize);
460      return;
461    }
462    if (mapSize < Integer.MAX_VALUE) {
463      double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.);
464      if (pct_diff > 0.05) {
465        LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal +
466          ", mapSize=" + mapSize);
467      }
468    }
469  }
470
471  /**
472   * Cache the block with the specified name and buffer.
473   * <p>
474   * TODO after HBASE-22005, we may cache an block which allocated from off-heap, but our LRU cache
475   * sizing is based on heap size, so we should handle this in HBASE-22127. It will introduce an
476   * switch whether make the LRU on-heap or not, if so we may need copy the memory to on-heap,
477   * otherwise the caching size is based on off-heap.
478   * @param cacheKey block's cache key
479   * @param buf block buffer
480   */
481  @Override
482  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
483    cacheBlock(cacheKey, buf, false);
484  }
485
486  /**
487   * Helper function that updates the local size counter and also updates any
488   * per-cf or per-blocktype metrics it can discern from given
489   * {@link LruCachedBlock}
490   */
491  private long updateSizeMetrics(LruCachedBlock cb, boolean evict) {
492    long heapsize = cb.heapSize();
493    BlockType bt = cb.getBuffer().getBlockType();
494    if (evict) {
495      heapsize *= -1;
496    }
497    if (bt != null && bt.isData()) {
498      dataBlockSize.add(heapsize);
499    }
500    return size.addAndGet(heapsize);
501  }
502
503  /**
504   * Get the buffer of the block with the specified name.
505   *
506   * @param cacheKey           block's cache key
507   * @param caching            true if the caller caches blocks on cache misses
508   * @param repeat             Whether this is a repeat lookup for the same block
509   *                           (used to avoid double counting cache misses when doing double-check
510   *                           locking)
511   * @param updateCacheMetrics Whether to update cache metrics or not
512   *
513   * @return buffer of specified cache key, or null if not in cache
514   */
515  @Override
516  public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
517      boolean updateCacheMetrics) {
518    LruCachedBlock cb = map.computeIfPresent(cacheKey, (key, val) -> {
519      // It will be referenced by RPC path, so increase here. NOTICE: Must do the retain inside
520      // this block. because if retain outside the map#computeIfPresent, the evictBlock may remove
521      // the block and release, then we're retaining a block with refCnt=0 which is disallowed.
522      // see HBASE-22422.
523      val.getBuffer().retain();
524      return val;
525    });
526    if (cb == null) {
527      if (!repeat && updateCacheMetrics) {
528        stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
529      }
530      // If there is another block cache then try and read there.
531      // However if this is a retry ( second time in double checked locking )
532      // And it's already a miss then the l2 will also be a miss.
533      if (victimHandler != null && !repeat) {
534        // The handler will increase result's refCnt for RPC, so need no extra retain.
535        Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
536        // Promote this to L1.
537        if (result != null) {
538          if (caching) {
539            cacheBlock(cacheKey, result, /* inMemory = */ false);
540          }
541        }
542        return result;
543      }
544      return null;
545    }
546    if (updateCacheMetrics) {
547      stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
548    }
549    cb.access(count.incrementAndGet());
550    return cb.getBuffer();
551  }
552
553  /**
554   * Whether the cache contains block with specified cacheKey
555   *
556   * @return true if contains the block
557   */
558  @Override
559  public boolean containsBlock(BlockCacheKey cacheKey) {
560    return map.containsKey(cacheKey);
561  }
562
563  @Override
564  public boolean evictBlock(BlockCacheKey cacheKey) {
565    LruCachedBlock cb = map.get(cacheKey);
566    return cb != null && evictBlock(cb, false) > 0;
567  }
568
569  /**
570   * Evicts all blocks for a specific HFile. This is an
571   * expensive operation implemented as a linear-time search through all blocks
572   * in the cache. Ideally this should be a search in a log-access-time map.
573   *
574   * <p>
575   * This is used for evict-on-close to remove all blocks of a specific HFile.
576   *
577   * @return the number of blocks evicted
578   */
579  @Override
580  public int evictBlocksByHfileName(String hfileName) {
581    int numEvicted = 0;
582    for (BlockCacheKey key : map.keySet()) {
583      if (key.getHfileName().equals(hfileName)) {
584        if (evictBlock(key)) {
585          ++numEvicted;
586        }
587      }
588    }
589    if (victimHandler != null) {
590      numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
591    }
592    return numEvicted;
593  }
594
595  /**
596   * Evict the block, and it will be cached by the victim handler if exists &amp;&amp;
597   * block may be read again later
598   *
599   * @param evictedByEvictionProcess true if the given block is evicted by
600   *          EvictionThread
601   * @return the heap size of evicted block
602   */
603  protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) {
604    LruCachedBlock previous = map.remove(block.getCacheKey());
605    if (previous == null) {
606      return 0;
607    }
608    updateSizeMetrics(block, true);
609    long val = elements.decrementAndGet();
610    if (LOG.isTraceEnabled()) {
611      long size = map.size();
612      assertCounterSanity(size, val);
613    }
614    if (block.getBuffer().getBlockType().isData()) {
615      dataBlockElements.decrement();
616    }
617    if (evictedByEvictionProcess) {
618      // When the eviction of the block happened because of invalidation of HFiles, no need to
619      // update the stats counter.
620      stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary());
621      if (victimHandler != null) {
622        victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer());
623      }
624    }
625    // Decrease the block's reference count, and if refCount is 0, then it'll auto-deallocate. DO
626    // NOT move this up because if do that then the victimHandler may access the buffer with
627    // refCnt = 0 which is disallowed.
628    previous.getBuffer().release();
629    return block.heapSize();
630  }
631
632  /**
633   * Multi-threaded call to run the eviction process.
634   */
635  private void runEviction() {
636    if (evictionThread == null) {
637      evict();
638    } else {
639      evictionThread.evict();
640    }
641  }
642
643  boolean isEvictionInProgress() {
644    return evictionInProgress;
645  }
646
647  long getOverhead() {
648    return overhead;
649  }
650
651  /**
652   * Eviction method.
653   */
654  void evict() {
655
656    // Ensure only one eviction at a time
657    if (!evictionLock.tryLock()) {
658      return;
659    }
660
661    try {
662      evictionInProgress = true;
663      long currentSize = this.size.get();
664      long bytesToFree = currentSize - minSize();
665
666      if (LOG.isTraceEnabled()) {
667        LOG.trace("Block cache LRU eviction started; Attempting to free " +
668          StringUtils.byteDesc(bytesToFree) + " of total=" +
669          StringUtils.byteDesc(currentSize));
670      }
671
672      if (bytesToFree <= 0) {
673        return;
674      }
675
676      // Instantiate priority buckets
677      BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize, singleSize());
678      BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize, multiSize());
679      BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize, memorySize());
680
681      // Scan entire map putting into appropriate buckets
682      for (LruCachedBlock cachedBlock : map.values()) {
683        switch (cachedBlock.getPriority()) {
684          case SINGLE: {
685            bucketSingle.add(cachedBlock);
686            break;
687          }
688          case MULTI: {
689            bucketMulti.add(cachedBlock);
690            break;
691          }
692          case MEMORY: {
693            bucketMemory.add(cachedBlock);
694            break;
695          }
696        }
697      }
698
699      long bytesFreed = 0;
700      if (forceInMemory || memoryFactor > 0.999f) {
701        long s = bucketSingle.totalSize();
702        long m = bucketMulti.totalSize();
703        if (bytesToFree > (s + m)) {
704          // this means we need to evict blocks in memory bucket to make room,
705          // so the single and multi buckets will be emptied
706          bytesFreed = bucketSingle.free(s);
707          bytesFreed += bucketMulti.free(m);
708          if (LOG.isTraceEnabled()) {
709            LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
710              " from single and multi buckets");
711          }
712          bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
713          if (LOG.isTraceEnabled()) {
714            LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
715              " total from all three buckets ");
716          }
717        } else {
718          // this means no need to evict block in memory bucket,
719          // and we try best to make the ratio between single-bucket and
720          // multi-bucket is 1:2
721          long bytesRemain = s + m - bytesToFree;
722          if (3 * s <= bytesRemain) {
723            // single-bucket is small enough that no eviction happens for it
724            // hence all eviction goes from multi-bucket
725            bytesFreed = bucketMulti.free(bytesToFree);
726          } else if (3 * m <= 2 * bytesRemain) {
727            // multi-bucket is small enough that no eviction happens for it
728            // hence all eviction goes from single-bucket
729            bytesFreed = bucketSingle.free(bytesToFree);
730          } else {
731            // both buckets need to evict some blocks
732            bytesFreed = bucketSingle.free(s - bytesRemain / 3);
733            if (bytesFreed < bytesToFree) {
734              bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
735            }
736          }
737        }
738      } else {
739        PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3);
740
741        bucketQueue.add(bucketSingle);
742        bucketQueue.add(bucketMulti);
743        bucketQueue.add(bucketMemory);
744
745        int remainingBuckets = bucketQueue.size();
746
747        BlockBucket bucket;
748        while ((bucket = bucketQueue.poll()) != null) {
749          long overflow = bucket.overflow();
750          if (overflow > 0) {
751            long bucketBytesToFree =
752                Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets);
753            bytesFreed += bucket.free(bucketBytesToFree);
754          }
755          remainingBuckets--;
756        }
757      }
758      if (LOG.isTraceEnabled()) {
759        long single = bucketSingle.totalSize();
760        long multi = bucketMulti.totalSize();
761        long memory = bucketMemory.totalSize();
762        LOG.trace("Block cache LRU eviction completed; " +
763          "freed=" + StringUtils.byteDesc(bytesFreed) + ", " +
764          "total=" + StringUtils.byteDesc(this.size.get()) + ", " +
765          "single=" + StringUtils.byteDesc(single) + ", " +
766          "multi=" + StringUtils.byteDesc(multi) + ", " +
767          "memory=" + StringUtils.byteDesc(memory));
768      }
769    } finally {
770      stats.evict();
771      evictionInProgress = false;
772      evictionLock.unlock();
773    }
774  }
775
776  @Override
777  public String toString() {
778    return MoreObjects.toStringHelper(this)
779      .add("blockCount", getBlockCount())
780      .add("currentSize", StringUtils.byteDesc(getCurrentSize()))
781      .add("freeSize", StringUtils.byteDesc(getFreeSize()))
782      .add("maxSize", StringUtils.byteDesc(getMaxSize()))
783      .add("heapSize", StringUtils.byteDesc(heapSize()))
784      .add("minSize", StringUtils.byteDesc(minSize()))
785      .add("minFactor", minFactor)
786      .add("multiSize", StringUtils.byteDesc(multiSize()))
787      .add("multiFactor", multiFactor)
788      .add("singleSize", StringUtils.byteDesc(singleSize()))
789      .add("singleFactor", singleFactor)
790      .toString();
791  }
792
793  /**
794   * Used to group blocks into priority buckets.  There will be a BlockBucket
795   * for each priority (single, multi, memory).  Once bucketed, the eviction
796   * algorithm takes the appropriate number of elements out of each according
797   * to configuration parameters and their relatives sizes.
798   */
799  private class BlockBucket implements Comparable<BlockBucket> {
800
801    private final String name;
802    private LruCachedBlockQueue queue;
803    private long totalSize = 0;
804    private long bucketSize;
805
806    public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) {
807      this.name = name;
808      this.bucketSize = bucketSize;
809      queue = new LruCachedBlockQueue(bytesToFree, blockSize);
810      totalSize = 0;
811    }
812
813    public void add(LruCachedBlock block) {
814      totalSize += block.heapSize();
815      queue.add(block);
816    }
817
818    public long free(long toFree) {
819      if (LOG.isTraceEnabled()) {
820        LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this);
821      }
822      LruCachedBlock cb;
823      long freedBytes = 0;
824      while ((cb = queue.pollLast()) != null) {
825        freedBytes += evictBlock(cb, true);
826        if (freedBytes >= toFree) {
827          return freedBytes;
828        }
829      }
830      if (LOG.isTraceEnabled()) {
831        LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this);
832      }
833      return freedBytes;
834    }
835
836    public long overflow() {
837      return totalSize - bucketSize;
838    }
839
840    public long totalSize() {
841      return totalSize;
842    }
843
844    @Override
845    public int compareTo(BlockBucket that) {
846      return Long.compare(this.overflow(), that.overflow());
847    }
848
849    @Override
850    public boolean equals(Object that) {
851      if (that == null || !(that instanceof BlockBucket)) {
852        return false;
853      }
854      return compareTo((BlockBucket)that) == 0;
855    }
856
857    @Override
858    public int hashCode() {
859      return Objects.hashCode(name, bucketSize, queue, totalSize);
860    }
861
862    @Override
863    public String toString() {
864      return MoreObjects.toStringHelper(this)
865        .add("name", name)
866        .add("totalSize", StringUtils.byteDesc(totalSize))
867        .add("bucketSize", StringUtils.byteDesc(bucketSize))
868        .toString();
869    }
870  }
871
872  /**
873   * Get the maximum size of this cache.
874   *
875   * @return max size in bytes
876   */
877
878  @Override
879  public long getMaxSize() {
880    return this.maxSize;
881  }
882
883  @Override
884  public long getCurrentSize() {
885    return this.size.get();
886  }
887
888  @Override
889  public long getCurrentDataSize() {
890    return this.dataBlockSize.sum();
891  }
892
893  @Override
894  public long getFreeSize() {
895    return getMaxSize() - getCurrentSize();
896  }
897
898  @Override
899  public long size() {
900    return getMaxSize();
901  }
902
903  @Override
904  public long getBlockCount() {
905    return this.elements.get();
906  }
907
908  @Override
909  public long getDataBlockCount() {
910    return this.dataBlockElements.sum();
911  }
912
913  EvictionThread getEvictionThread() {
914    return this.evictionThread;
915  }
916
917  /*
918   * Eviction thread.  Sits in waiting state until an eviction is triggered
919   * when the cache size grows above the acceptable level.<p>
920   *
921   * Thread is triggered into action by {@link LruBlockCache#runEviction()}
922   */
923  static class EvictionThread extends Thread {
924
925    private WeakReference<LruBlockCache> cache;
926    private volatile boolean go = true;
927    // flag set after enter the run method, used for test
928    private boolean enteringRun = false;
929
930    public EvictionThread(LruBlockCache cache) {
931      super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread");
932      setDaemon(true);
933      this.cache = new WeakReference<>(cache);
934    }
935
936    @Override
937    public void run() {
938      enteringRun = true;
939      while (this.go) {
940        synchronized (this) {
941          try {
942            this.wait(1000 * 10/*Don't wait for ever*/);
943          } catch (InterruptedException e) {
944            LOG.warn("Interrupted eviction thread ", e);
945            Thread.currentThread().interrupt();
946          }
947        }
948        LruBlockCache cache = this.cache.get();
949        if (cache == null) {
950          break;
951        }
952        cache.evict();
953      }
954    }
955
956    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
957        justification="This is what we want")
958    public void evict() {
959      synchronized (this) {
960        this.notifyAll();
961      }
962    }
963
964    synchronized void shutdown() {
965      this.go = false;
966      this.notifyAll();
967    }
968
969    /**
970     * Used for the test.
971     */
972    boolean isEnteringRun() {
973      return this.enteringRun;
974    }
975  }
976
977  /*
978   * Statistics thread.  Periodically prints the cache statistics to the log.
979   */
980  static class StatisticsThread extends Thread {
981
982    private final LruBlockCache lru;
983
984    public StatisticsThread(LruBlockCache lru) {
985      super("LruBlockCacheStats");
986      setDaemon(true);
987      this.lru = lru;
988    }
989
990    @Override
991    public void run() {
992      lru.logStats();
993    }
994  }
995
996  public void logStats() {
997    // Log size
998    long totalSize = heapSize();
999    long freeSize = maxSize - totalSize;
1000    LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
1001        "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
1002        "max=" + StringUtils.byteDesc(this.maxSize) + ", " +
1003        "blockCount=" + getBlockCount() + ", " +
1004        "accesses=" + stats.getRequestCount() + ", " +
1005        "hits=" + stats.getHitCount() + ", " +
1006        "hitRatio=" + (stats.getHitCount() == 0 ?
1007          "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
1008        "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
1009        "cachingHits=" + stats.getHitCachingCount() + ", " +
1010        "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ?
1011          "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
1012        "evictions=" + stats.getEvictionCount() + ", " +
1013        "evicted=" + stats.getEvictedCount() + ", " +
1014        "evictedPerRun=" + stats.evictedPerEviction());
1015  }
1016
1017  /**
1018   * Get counter statistics for this cache.
1019   *
1020   * <p>Includes: total accesses, hits, misses, evicted blocks, and runs
1021   * of the eviction processes.
1022   */
1023  @Override
1024  public CacheStats getStats() {
1025    return this.stats;
1026  }
1027
1028  public final static long CACHE_FIXED_OVERHEAD =
1029      ClassSize.estimateBase(LruBlockCache.class, false);
1030
1031  @Override
1032  public long heapSize() {
1033    return getCurrentSize();
1034  }
1035
1036  private static long calculateOverhead(long maxSize, long blockSize, int concurrency) {
1037    // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG
1038    return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP
1039           + ((long) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY)
1040           + ((long) concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
1041  }
1042
1043  @Override
1044  public Iterator<CachedBlock> iterator() {
1045    final Iterator<LruCachedBlock> iterator = map.values().iterator();
1046
1047    return new Iterator<CachedBlock>() {
1048      private final long now = System.nanoTime();
1049
1050      @Override
1051      public boolean hasNext() {
1052        return iterator.hasNext();
1053      }
1054
1055      @Override
1056      public CachedBlock next() {
1057        final LruCachedBlock b = iterator.next();
1058        return new CachedBlock() {
1059          @Override
1060          public String toString() {
1061            return BlockCacheUtil.toString(this, now);
1062          }
1063
1064          @Override
1065          public BlockPriority getBlockPriority() {
1066            return b.getPriority();
1067          }
1068
1069          @Override
1070          public BlockType getBlockType() {
1071            return b.getBuffer().getBlockType();
1072          }
1073
1074          @Override
1075          public long getOffset() {
1076            return b.getCacheKey().getOffset();
1077          }
1078
1079          @Override
1080          public long getSize() {
1081            return b.getBuffer().heapSize();
1082          }
1083
1084          @Override
1085          public long getCachedTime() {
1086            return b.getCachedTime();
1087          }
1088
1089          @Override
1090          public String getFilename() {
1091            return b.getCacheKey().getHfileName();
1092          }
1093
1094          @Override
1095          public int compareTo(CachedBlock other) {
1096            int diff = this.getFilename().compareTo(other.getFilename());
1097            if (diff != 0) {
1098              return diff;
1099            }
1100            diff = Long.compare(this.getOffset(), other.getOffset());
1101            if (diff != 0) {
1102              return diff;
1103            }
1104            if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
1105              throw new IllegalStateException(this.getCachedTime() + ", " + other.getCachedTime());
1106            }
1107            return Long.compare(other.getCachedTime(), this.getCachedTime());
1108          }
1109
1110          @Override
1111          public int hashCode() {
1112            return b.hashCode();
1113          }
1114
1115          @Override
1116          public boolean equals(Object obj) {
1117            if (obj instanceof CachedBlock) {
1118              CachedBlock cb = (CachedBlock)obj;
1119              return compareTo(cb) == 0;
1120            } else {
1121              return false;
1122            }
1123          }
1124        };
1125      }
1126
1127      @Override
1128      public void remove() {
1129        throw new UnsupportedOperationException();
1130      }
1131    };
1132  }
1133
1134  // Simple calculators of sizes given factors and maxSize
1135
1136  long acceptableSize() {
1137    return (long)Math.floor(this.maxSize * this.acceptableFactor);
1138  }
1139  private long minSize() {
1140    return (long)Math.floor(this.maxSize * this.minFactor);
1141  }
1142  private long singleSize() {
1143    return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor);
1144  }
1145  private long multiSize() {
1146    return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor);
1147  }
1148  private long memorySize() {
1149    return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
1150  }
1151
1152  @Override
1153  public void shutdown() {
1154    if (victimHandler != null) {
1155      victimHandler.shutdown();
1156    }
1157    this.scheduleThreadPool.shutdown();
1158    for (int i = 0; i < 10; i++) {
1159      if (!this.scheduleThreadPool.isShutdown()) {
1160        try {
1161          Thread.sleep(10);
1162        } catch (InterruptedException e) {
1163          LOG.warn("Interrupted while sleeping");
1164          Thread.currentThread().interrupt();
1165          break;
1166        }
1167      }
1168    }
1169
1170    if (!this.scheduleThreadPool.isShutdown()) {
1171      List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
1172      LOG.debug("Still running " + runnables);
1173    }
1174    this.evictionThread.shutdown();
1175  }
1176
1177  /** Clears the cache. Used in tests. */
1178  public void clearCache() {
1179    this.map.clear();
1180    this.elements.set(0);
1181  }
1182
1183  /**
1184   * Used in testing. May be very inefficient.
1185   *
1186   * @return the set of cached file names
1187   */
1188  SortedSet<String> getCachedFileNamesForTest() {
1189    SortedSet<String> fileNames = new TreeSet<>();
1190    for (BlockCacheKey cacheKey : map.keySet()) {
1191      fileNames.add(cacheKey.getHfileName());
1192    }
1193    return fileNames;
1194  }
1195
1196  public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
1197    Map<DataBlockEncoding, Integer> counts = new EnumMap<>(DataBlockEncoding.class);
1198    for (LruCachedBlock block : map.values()) {
1199      DataBlockEncoding encoding = ((HFileBlock) block.getBuffer()).getDataBlockEncoding();
1200      Integer count = counts.get(encoding);
1201      counts.put(encoding, (count == null ? 0 : count) + 1);
1202    }
1203    return counts;
1204  }
1205
1206  Map<BlockCacheKey, LruCachedBlock> getMapForTests() {
1207    return map;
1208  }
1209
1210  @Override
1211  public BlockCache[] getBlockCaches() {
1212    if (victimHandler != null) {
1213      return new BlockCache[] { this, this.victimHandler };
1214    }
1215    return null;
1216  }
1217}