001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static java.util.Objects.requireNonNull;
021
022import java.lang.ref.WeakReference;
023import java.util.EnumMap;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027import java.util.PriorityQueue;
028import java.util.SortedSet;
029import java.util.TreeSet;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.Executors;
032import java.util.concurrent.ScheduledExecutorService;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicLong;
035import java.util.concurrent.atomic.LongAdder;
036import java.util.concurrent.locks.ReentrantLock;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.io.HeapSize;
039import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.util.ClassSize;
042import org.apache.hadoop.util.StringUtils;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
048import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
049import org.apache.hbase.thirdparty.com.google.common.base.Objects;
050import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
051
052/**
053 * A block cache implementation that is memory-aware using {@link HeapSize},
054 * memory-bound using an LRU eviction algorithm, and concurrent: backed by a
055 * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving
056 * constant-time {@link #cacheBlock} and {@link #getBlock} operations.<p>
057 *
058 * Contains three levels of block priority to allow for scan-resistance and in-memory families
059 * {@link org.apache.hadoop.hbase.HColumnDescriptor#setInMemory(boolean)} (An in-memory column
060 * family is a column family that should be served from memory if possible):
061 * single-access, multiple-accesses, and in-memory priority.
062 * A block is added with an in-memory priority flag if
063 * {@link org.apache.hadoop.hbase.HColumnDescriptor#isInMemory()}, otherwise a block becomes a
064 * single access priority the first time it is read into this block cache.  If a block is
065 * accessed again while in cache, it is marked as a multiple access priority block.  This
066 * delineation of blocks is used to prevent scans from thrashing the cache adding a
067 * least-frequently-used element to the eviction algorithm.<p>
068 *
069 * Each priority is given its own chunk of the total cache to ensure
070 * fairness during eviction.  Each priority will retain close to its maximum
071 * size, however, if any priority is not using its entire chunk the others
072 * are able to grow beyond their chunk size.<p>
073 *
074 * Instantiated at a minimum with the total size and average block size.
075 * All sizes are in bytes.  The block size is not especially important as this
076 * cache is fully dynamic in its sizing of blocks.  It is only used for
077 * pre-allocating data structures and in initial heap estimation of the map.<p>
078 *
079 * The detailed constructor defines the sizes for the three priorities (they
080 * should total to the <code>maximum size</code> defined).  It also sets the levels that
081 * trigger and control the eviction thread.<p>
082 *
083 * The <code>acceptable size</code> is the cache size level which triggers the eviction
084 * process to start.  It evicts enough blocks to get the size below the
085 * minimum size specified.<p>
086 *
087 * Eviction happens in a separate thread and involves a single full-scan
088 * of the map.  It determines how many bytes must be freed to reach the minimum
089 * size, and then while scanning determines the fewest least-recently-used
090 * blocks necessary from each of the three priorities (would be 3 times bytes
091 * to free).  It then uses the priority chunk sizes to evict fairly according
092 * to the relative sizes and usage.
093 */
094@InterfaceAudience.Private
095public class LruBlockCache implements FirstLevelBlockCache {
096
097  private static final Logger LOG = LoggerFactory.getLogger(LruBlockCache.class);
098
099  /**
100   * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep
101   * evicting during an eviction run till the cache size is down to 80% of the total.
102   */
103  private static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";
104
105  /**
106   * Acceptable size of cache (no evictions if size < acceptable)
107   */
108  private static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME =
109      "hbase.lru.blockcache.acceptable.factor";
110
111  /**
112   * Hard capacity limit of cache, will reject any put if size > this * acceptable
113   */
114  static final String LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME =
115      "hbase.lru.blockcache.hard.capacity.limit.factor";
116  private static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME =
117      "hbase.lru.blockcache.single.percentage";
118  private static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME =
119      "hbase.lru.blockcache.multi.percentage";
120  private static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME =
121      "hbase.lru.blockcache.memory.percentage";
122
123  /**
124   * Configuration key to force data-block always (except in-memory are too much)
125   * cached in memory for in-memory hfile, unlike inMemory, which is a column-family
126   * configuration, inMemoryForceMode is a cluster-wide configuration
127   */
128  private static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME =
129      "hbase.lru.rs.inmemoryforcemode";
130
131  /* Default Configuration Parameters*/
132
133  /* Backing Concurrent Map Configuration */
134  static final float DEFAULT_LOAD_FACTOR = 0.75f;
135  static final int DEFAULT_CONCURRENCY_LEVEL = 16;
136
137  /* Eviction thresholds */
138  private static final float DEFAULT_MIN_FACTOR = 0.95f;
139  static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f;
140
141  /* Priority buckets */
142  private static final float DEFAULT_SINGLE_FACTOR = 0.25f;
143  private static final float DEFAULT_MULTI_FACTOR = 0.50f;
144  private static final float DEFAULT_MEMORY_FACTOR = 0.25f;
145
146  private static final float DEFAULT_HARD_CAPACITY_LIMIT_FACTOR = 1.2f;
147
148  private static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false;
149
150  /* Statistics thread */
151  private static final int STAT_THREAD_PERIOD = 60 * 5;
152  private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size";
153  private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
154
155  /**
156   * Defined the cache map as {@link ConcurrentHashMap} here, because in
157   * {@link LruBlockCache#getBlock}, we need to guarantee the atomicity of map#computeIfPresent
158   * (key, func). Besides, the func method must execute exactly once only when the key is present
159   * and under the lock context, otherwise the reference count will be messed up. Notice that the
160   * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that.
161   */
162  private transient final ConcurrentHashMap<BlockCacheKey, LruCachedBlock> map;
163
164  /** Eviction lock (locked when eviction in process) */
165  private transient final ReentrantLock evictionLock = new ReentrantLock(true);
166
167  private final long maxBlockSize;
168
169  /** Volatile boolean to track if we are in an eviction process or not */
170  private volatile boolean evictionInProgress = false;
171
172  /** Eviction thread */
173  private transient final EvictionThread evictionThread;
174
175  /** Statistics thread schedule pool (for heavy debugging, could remove) */
176  private transient final ScheduledExecutorService scheduleThreadPool =
177    Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
178      .setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build());
179
180  /** Current size of cache */
181  private final AtomicLong size;
182
183  /** Current size of data blocks */
184  private final LongAdder dataBlockSize;
185
186  /** Current number of cached elements */
187  private final AtomicLong elements;
188
189  /** Current number of cached data block elements */
190  private final LongAdder dataBlockElements;
191
192  /** Cache access count (sequential ID) */
193  private final AtomicLong count;
194
195  /** hard capacity limit */
196  private float hardCapacityLimitFactor;
197
198  /** Cache statistics */
199  private final CacheStats stats;
200
201  /** Maximum allowable size of cache (block put if size > max, evict) */
202  private long maxSize;
203
204  /** Approximate block size */
205  private long blockSize;
206
207  /** Acceptable size of cache (no evictions if size < acceptable) */
208  private float acceptableFactor;
209
210  /** Minimum threshold of cache (when evicting, evict until size < min) */
211  private float minFactor;
212
213  /** Single access bucket size */
214  private float singleFactor;
215
216  /** Multiple access bucket size */
217  private float multiFactor;
218
219  /** In-memory bucket size */
220  private float memoryFactor;
221
222  /** Overhead of the structure itself */
223  private long overhead;
224
225  /** Whether in-memory hfile's data block has higher priority when evicting */
226  private boolean forceInMemory;
227
228  /**
229   * Where to send victims (blocks evicted/missing from the cache). This is used only when we use an
230   * external cache as L2.
231   * Note: See org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache
232   */
233  private transient BlockCache victimHandler = null;
234
235  /**
236   * Default constructor.  Specify maximum size and expected average block
237   * size (approximation is fine).
238   *
239   * <p>All other factors will be calculated based on defaults specified in
240   * this class.
241   *
242   * @param maxSize   maximum size of cache, in bytes
243   * @param blockSize approximate size of each block, in bytes
244   */
245  public LruBlockCache(long maxSize, long blockSize) {
246    this(maxSize, blockSize, true);
247  }
248
249  /**
250   * Constructor used for testing.  Allows disabling of the eviction thread.
251   */
252  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
253    this(maxSize, blockSize, evictionThread,
254        (int) Math.ceil(1.2 * maxSize / blockSize),
255        DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
256        DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
257        DEFAULT_SINGLE_FACTOR,
258        DEFAULT_MULTI_FACTOR,
259        DEFAULT_MEMORY_FACTOR,
260        DEFAULT_HARD_CAPACITY_LIMIT_FACTOR,
261        false,
262        DEFAULT_MAX_BLOCK_SIZE);
263  }
264
265  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
266    this(maxSize, blockSize, evictionThread,
267        (int) Math.ceil(1.2 * maxSize / blockSize),
268        DEFAULT_LOAD_FACTOR,
269        DEFAULT_CONCURRENCY_LEVEL,
270        conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
271        conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR),
272        conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR),
273        conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR),
274        conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR),
275        conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME,
276                      DEFAULT_HARD_CAPACITY_LIMIT_FACTOR),
277        conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE),
278        conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE));
279  }
280
281  public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
282    this(maxSize, blockSize, true, conf);
283  }
284
285  /**
286   * Configurable constructor.  Use this constructor if not using defaults.
287   *
288   * @param maxSize             maximum size of this cache, in bytes
289   * @param blockSize           expected average size of blocks, in bytes
290   * @param evictionThread      whether to run evictions in a bg thread or not
291   * @param mapInitialSize      initial size of backing ConcurrentHashMap
292   * @param mapLoadFactor       initial load factor of backing ConcurrentHashMap
293   * @param mapConcurrencyLevel initial concurrency factor for backing CHM
294   * @param minFactor           percentage of total size that eviction will evict until
295   * @param acceptableFactor    percentage of total size that triggers eviction
296   * @param singleFactor        percentage of total size for single-access blocks
297   * @param multiFactor         percentage of total size for multiple-access blocks
298   * @param memoryFactor        percentage of total size for in-memory blocks
299   */
300  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
301      int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
302      float minFactor, float acceptableFactor, float singleFactor,
303      float multiFactor, float memoryFactor, float hardLimitFactor,
304      boolean forceInMemory, long maxBlockSize) {
305    this.maxBlockSize = maxBlockSize;
306    if(singleFactor + multiFactor + memoryFactor != 1 ||
307        singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) {
308      throw new IllegalArgumentException("Single, multi, and memory factors " +
309          " should be non-negative and total 1.0");
310    }
311    if (minFactor >= acceptableFactor) {
312      throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
313    }
314    if (minFactor >= 1.0f || acceptableFactor >= 1.0f) {
315      throw new IllegalArgumentException("all factors must be < 1");
316    }
317    this.maxSize = maxSize;
318    this.blockSize = blockSize;
319    this.forceInMemory = forceInMemory;
320    map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel);
321    this.minFactor = minFactor;
322    this.acceptableFactor = acceptableFactor;
323    this.singleFactor = singleFactor;
324    this.multiFactor = multiFactor;
325    this.memoryFactor = memoryFactor;
326    this.stats = new CacheStats(this.getClass().getSimpleName());
327    this.count = new AtomicLong(0);
328    this.elements = new AtomicLong(0);
329    this.dataBlockElements = new LongAdder();
330    this.dataBlockSize = new LongAdder();
331    this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
332    this.size = new AtomicLong(this.overhead);
333    this.hardCapacityLimitFactor = hardLimitFactor;
334    if (evictionThread) {
335      this.evictionThread = new EvictionThread(this);
336      this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
337    } else {
338      this.evictionThread = null;
339    }
340    // TODO: Add means of turning this off.  Bit obnoxious running thread just to make a log
341    // every five minutes.
342    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD,
343                                                STAT_THREAD_PERIOD, TimeUnit.SECONDS);
344  }
345
346  @Override
347  public void setVictimCache(BlockCache victimCache) {
348    if (victimHandler != null) {
349      throw new IllegalArgumentException("The victim cache has already been set");
350    }
351    victimHandler = requireNonNull(victimCache);
352  }
353
354  @Override
355  public void setMaxSize(long maxSize) {
356    this.maxSize = maxSize;
357    if (this.size.get() > acceptableSize() && !evictionInProgress) {
358      runEviction();
359    }
360  }
361
362  /**
363   * The block cached in LRUBlockCache will always be an heap block: on the one side, the heap
364   * access will be more faster then off-heap, the small index block or meta block cached in
365   * CombinedBlockCache will benefit a lot. on other side, the LRUBlockCache size is always
366   * calculated based on the total heap size, if caching an off-heap block in LRUBlockCache, the
367   * heap size will be messed up. Here we will clone the block into an heap block if it's an
368   * off-heap block, otherwise just use the original block. The key point is maintain the refCnt of
369   * the block (HBASE-22127): <br>
370   * 1. if cache the cloned heap block, its refCnt is an totally new one, it's easy to handle; <br>
371   * 2. if cache the original heap block, we're sure that it won't be tracked in ByteBuffAllocator's
372   * reservoir, if both RPC and LRUBlockCache release the block, then it can be garbage collected by
373   * JVM, so need a retain here.
374   * @param buf the original block
375   * @return an block with an heap memory backend.
376   */
377  private Cacheable asReferencedHeapBlock(Cacheable buf) {
378    if (buf instanceof HFileBlock) {
379      HFileBlock blk = ((HFileBlock) buf);
380      if (blk.isSharedMem()) {
381        return HFileBlock.deepCloneOnHeap(blk);
382      }
383    }
384    // The block will be referenced by this LRUBlockCache, so should increase its refCnt here.
385    return buf.retain();
386  }
387
388  // BlockCache implementation
389
390  /**
391   * Cache the block with the specified name and buffer.
392   * <p>
393   * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)
394   * this can happen, for which we compare the buffer contents.
395   *
396   * @param cacheKey block's cache key
397   * @param buf      block buffer
398   * @param inMemory if block is in-memory
399   */
400  @Override
401  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
402    if (buf.heapSize() > maxBlockSize) {
403      // If there are a lot of blocks that are too
404      // big this can make the logs way too noisy.
405      // So we log 2%
406      if (stats.failInsert() % 50 == 0) {
407        LOG.warn("Trying to cache too large a block "
408            + cacheKey.getHfileName() + " @ "
409            + cacheKey.getOffset()
410            + " is " + buf.heapSize()
411            + " which is larger than " + maxBlockSize);
412      }
413      return;
414    }
415
416    LruCachedBlock cb = map.get(cacheKey);
417    if (cb != null && !BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, buf)) {
418      return;
419    }
420    long currentSize = size.get();
421    long currentAcceptableSize = acceptableSize();
422    long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize);
423    if (currentSize >= hardLimitSize) {
424      stats.failInsert();
425      if (LOG.isTraceEnabled()) {
426        LOG.trace("LruBlockCache current size " + StringUtils.byteDesc(currentSize)
427          + " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + "."
428          + " The hard limit size is " + StringUtils.byteDesc(hardLimitSize)
429          + ", failed to put cacheKey:" + cacheKey + " into LruBlockCache.");
430      }
431      if (!evictionInProgress) {
432        runEviction();
433      }
434      return;
435    }
436    // Ensure that the block is an heap one.
437    buf = asReferencedHeapBlock(buf);
438    cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
439    long newSize = updateSizeMetrics(cb, false);
440    map.put(cacheKey, cb);
441    long val = elements.incrementAndGet();
442    if (buf.getBlockType().isData()) {
443       dataBlockElements.increment();
444    }
445    if (LOG.isTraceEnabled()) {
446      long size = map.size();
447      assertCounterSanity(size, val);
448    }
449    if (newSize > currentAcceptableSize && !evictionInProgress) {
450      runEviction();
451    }
452  }
453
454  /**
455   * Sanity-checking for parity between actual block cache content and metrics.
456   * Intended only for use with TRACE level logging and -ea JVM.
457   */
458  private static void assertCounterSanity(long mapSize, long counterVal) {
459    if (counterVal < 0) {
460      LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal +
461        ", mapSize=" + mapSize);
462      return;
463    }
464    if (mapSize < Integer.MAX_VALUE) {
465      double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.);
466      if (pct_diff > 0.05) {
467        LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal +
468          ", mapSize=" + mapSize);
469      }
470    }
471  }
472
473  /**
474   * Cache the block with the specified name and buffer.
475   * <p>
476   * TODO after HBASE-22005, we may cache an block which allocated from off-heap, but our LRU cache
477   * sizing is based on heap size, so we should handle this in HBASE-22127. It will introduce an
478   * switch whether make the LRU on-heap or not, if so we may need copy the memory to on-heap,
479   * otherwise the caching size is based on off-heap.
480   * @param cacheKey block's cache key
481   * @param buf block buffer
482   */
483  @Override
484  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
485    cacheBlock(cacheKey, buf, false);
486  }
487
488  /**
489   * Helper function that updates the local size counter and also updates any
490   * per-cf or per-blocktype metrics it can discern from given
491   * {@link LruCachedBlock}
492   */
493  private long updateSizeMetrics(LruCachedBlock cb, boolean evict) {
494    long heapsize = cb.heapSize();
495    BlockType bt = cb.getBuffer().getBlockType();
496    if (evict) {
497      heapsize *= -1;
498    }
499    if (bt != null && bt.isData()) {
500       dataBlockSize.add(heapsize);
501    }
502    return size.addAndGet(heapsize);
503  }
504
505  /**
506   * Get the buffer of the block with the specified name.
507   *
508   * @param cacheKey           block's cache key
509   * @param caching            true if the caller caches blocks on cache misses
510   * @param repeat             Whether this is a repeat lookup for the same block
511   *                           (used to avoid double counting cache misses when doing double-check
512   *                           locking)
513   * @param updateCacheMetrics Whether to update cache metrics or not
514   *
515   * @return buffer of specified cache key, or null if not in cache
516   */
517  @Override
518  public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
519      boolean updateCacheMetrics) {
520    LruCachedBlock cb = map.computeIfPresent(cacheKey, (key, val) -> {
521      // It will be referenced by RPC path, so increase here. NOTICE: Must do the retain inside
522      // this block. because if retain outside the map#computeIfPresent, the evictBlock may remove
523      // the block and release, then we're retaining a block with refCnt=0 which is disallowed.
524      // see HBASE-22422.
525      val.getBuffer().retain();
526      return val;
527    });
528    if (cb == null) {
529      if (!repeat && updateCacheMetrics) {
530        stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
531      }
532      // If there is another block cache then try and read there.
533      // However if this is a retry ( second time in double checked locking )
534      // And it's already a miss then the l2 will also be a miss.
535      if (victimHandler != null && !repeat) {
536        // The handler will increase result's refCnt for RPC, so need no extra retain.
537        Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
538        // Promote this to L1.
539        if (result != null) {
540          if (caching) {
541            cacheBlock(cacheKey, result, /* inMemory = */ false);
542          }
543        }
544        return result;
545      }
546      return null;
547    }
548    if (updateCacheMetrics) {
549      stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
550    }
551    cb.access(count.incrementAndGet());
552    return cb.getBuffer();
553  }
554
555  /**
556   * Whether the cache contains block with specified cacheKey
557   *
558   * @return true if contains the block
559   */
560  @Override
561  public boolean containsBlock(BlockCacheKey cacheKey) {
562    return map.containsKey(cacheKey);
563  }
564
565  @Override
566  public boolean evictBlock(BlockCacheKey cacheKey) {
567    LruCachedBlock cb = map.get(cacheKey);
568    return cb != null && evictBlock(cb, false) > 0;
569  }
570
571  /**
572   * Evicts all blocks for a specific HFile. This is an
573   * expensive operation implemented as a linear-time search through all blocks
574   * in the cache. Ideally this should be a search in a log-access-time map.
575   *
576   * <p>
577   * This is used for evict-on-close to remove all blocks of a specific HFile.
578   *
579   * @return the number of blocks evicted
580   */
581  @Override
582  public int evictBlocksByHfileName(String hfileName) {
583    int numEvicted = 0;
584    for (BlockCacheKey key : map.keySet()) {
585      if (key.getHfileName().equals(hfileName)) {
586        if (evictBlock(key))
587          ++numEvicted;
588      }
589    }
590    if (victimHandler != null) {
591      numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
592    }
593    return numEvicted;
594  }
595
596  /**
597   * Evict the block, and it will be cached by the victim handler if exists &amp;&amp;
598   * block may be read again later
599   *
600   * @param evictedByEvictionProcess true if the given block is evicted by
601   *          EvictionThread
602   * @return the heap size of evicted block
603   */
604  protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) {
605    LruCachedBlock previous = map.remove(block.getCacheKey());
606    if (previous == null) {
607      return 0;
608    }
609    updateSizeMetrics(block, true);
610    long val = elements.decrementAndGet();
611    if (LOG.isTraceEnabled()) {
612      long size = map.size();
613      assertCounterSanity(size, val);
614    }
615    if (block.getBuffer().getBlockType().isData()) {
616      dataBlockElements.decrement();
617    }
618    if (evictedByEvictionProcess) {
619      // When the eviction of the block happened because of invalidation of HFiles, no need to
620      // update the stats counter.
621      stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary());
622      if (victimHandler != null) {
623        victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer());
624      }
625    }
626    // Decrease the block's reference count, and if refCount is 0, then it'll auto-deallocate. DO
627    // NOT move this up because if do that then the victimHandler may access the buffer with
628    // refCnt = 0 which is disallowed.
629    previous.getBuffer().release();
630    return block.heapSize();
631  }
632
633  /**
634   * Multi-threaded call to run the eviction process.
635   */
636  private void runEviction() {
637    if (evictionThread == null) {
638      evict();
639    } else {
640      evictionThread.evict();
641    }
642  }
643
644  @VisibleForTesting
645  boolean isEvictionInProgress() {
646    return evictionInProgress;
647  }
648
649  @VisibleForTesting
650  long getOverhead() {
651    return overhead;
652  }
653
654  /**
655   * Eviction method.
656   */
657  void evict() {
658
659    // Ensure only one eviction at a time
660    if(!evictionLock.tryLock()) return;
661
662    try {
663      evictionInProgress = true;
664      long currentSize = this.size.get();
665      long bytesToFree = currentSize - minSize();
666
667      if (LOG.isTraceEnabled()) {
668        LOG.trace("Block cache LRU eviction started; Attempting to free " +
669          StringUtils.byteDesc(bytesToFree) + " of total=" +
670          StringUtils.byteDesc(currentSize));
671      }
672
673      if (bytesToFree <= 0) return;
674
675      // Instantiate priority buckets
676      BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize, singleSize());
677      BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize, multiSize());
678      BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize, memorySize());
679
680      // Scan entire map putting into appropriate buckets
681      for (LruCachedBlock cachedBlock : map.values()) {
682        switch (cachedBlock.getPriority()) {
683          case SINGLE: {
684            bucketSingle.add(cachedBlock);
685            break;
686          }
687          case MULTI: {
688            bucketMulti.add(cachedBlock);
689            break;
690          }
691          case MEMORY: {
692            bucketMemory.add(cachedBlock);
693            break;
694          }
695        }
696      }
697
698      long bytesFreed = 0;
699      if (forceInMemory || memoryFactor > 0.999f) {
700        long s = bucketSingle.totalSize();
701        long m = bucketMulti.totalSize();
702        if (bytesToFree > (s + m)) {
703          // this means we need to evict blocks in memory bucket to make room,
704          // so the single and multi buckets will be emptied
705          bytesFreed = bucketSingle.free(s);
706          bytesFreed += bucketMulti.free(m);
707          if (LOG.isTraceEnabled()) {
708            LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
709              " from single and multi buckets");
710          }
711          bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
712          if (LOG.isTraceEnabled()) {
713            LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
714              " total from all three buckets ");
715          }
716        } else {
717          // this means no need to evict block in memory bucket,
718          // and we try best to make the ratio between single-bucket and
719          // multi-bucket is 1:2
720          long bytesRemain = s + m - bytesToFree;
721          if (3 * s <= bytesRemain) {
722            // single-bucket is small enough that no eviction happens for it
723            // hence all eviction goes from multi-bucket
724            bytesFreed = bucketMulti.free(bytesToFree);
725          } else if (3 * m <= 2 * bytesRemain) {
726            // multi-bucket is small enough that no eviction happens for it
727            // hence all eviction goes from single-bucket
728            bytesFreed = bucketSingle.free(bytesToFree);
729          } else {
730            // both buckets need to evict some blocks
731            bytesFreed = bucketSingle.free(s - bytesRemain / 3);
732            if (bytesFreed < bytesToFree) {
733              bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
734            }
735          }
736        }
737      } else {
738        PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3);
739
740        bucketQueue.add(bucketSingle);
741        bucketQueue.add(bucketMulti);
742        bucketQueue.add(bucketMemory);
743
744        int remainingBuckets = bucketQueue.size();
745
746        BlockBucket bucket;
747        while ((bucket = bucketQueue.poll()) != null) {
748          long overflow = bucket.overflow();
749          if (overflow > 0) {
750            long bucketBytesToFree =
751                Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets);
752            bytesFreed += bucket.free(bucketBytesToFree);
753          }
754          remainingBuckets--;
755        }
756      }
757      if (LOG.isTraceEnabled()) {
758        long single = bucketSingle.totalSize();
759        long multi = bucketMulti.totalSize();
760        long memory = bucketMemory.totalSize();
761        LOG.trace("Block cache LRU eviction completed; " +
762          "freed=" + StringUtils.byteDesc(bytesFreed) + ", " +
763          "total=" + StringUtils.byteDesc(this.size.get()) + ", " +
764          "single=" + StringUtils.byteDesc(single) + ", " +
765          "multi=" + StringUtils.byteDesc(multi) + ", " +
766          "memory=" + StringUtils.byteDesc(memory));
767      }
768    } finally {
769      stats.evict();
770      evictionInProgress = false;
771      evictionLock.unlock();
772    }
773  }
774
775  @Override
776  public String toString() {
777    return MoreObjects.toStringHelper(this)
778      .add("blockCount", getBlockCount())
779      .add("currentSize", StringUtils.byteDesc(getCurrentSize()))
780      .add("freeSize", StringUtils.byteDesc(getFreeSize()))
781      .add("maxSize", StringUtils.byteDesc(getMaxSize()))
782      .add("heapSize", StringUtils.byteDesc(heapSize()))
783      .add("minSize", StringUtils.byteDesc(minSize()))
784      .add("minFactor", minFactor)
785      .add("multiSize", StringUtils.byteDesc(multiSize()))
786      .add("multiFactor", multiFactor)
787      .add("singleSize", StringUtils.byteDesc(singleSize()))
788      .add("singleFactor", singleFactor)
789      .toString();
790  }
791
792  /**
793   * Used to group blocks into priority buckets.  There will be a BlockBucket
794   * for each priority (single, multi, memory).  Once bucketed, the eviction
795   * algorithm takes the appropriate number of elements out of each according
796   * to configuration parameters and their relatives sizes.
797   */
798  private class BlockBucket implements Comparable<BlockBucket> {
799
800    private final String name;
801    private LruCachedBlockQueue queue;
802    private long totalSize = 0;
803    private long bucketSize;
804
805    public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) {
806      this.name = name;
807      this.bucketSize = bucketSize;
808      queue = new LruCachedBlockQueue(bytesToFree, blockSize);
809      totalSize = 0;
810    }
811
812    public void add(LruCachedBlock block) {
813      totalSize += block.heapSize();
814      queue.add(block);
815    }
816
817    public long free(long toFree) {
818      if (LOG.isTraceEnabled()) {
819        LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this);
820      }
821      LruCachedBlock cb;
822      long freedBytes = 0;
823      while ((cb = queue.pollLast()) != null) {
824        freedBytes += evictBlock(cb, true);
825        if (freedBytes >= toFree) {
826          return freedBytes;
827        }
828      }
829      if (LOG.isTraceEnabled()) {
830        LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this);
831      }
832      return freedBytes;
833    }
834
835    public long overflow() {
836      return totalSize - bucketSize;
837    }
838
839    public long totalSize() {
840      return totalSize;
841    }
842
843    @Override
844    public int compareTo(BlockBucket that) {
845      return Long.compare(this.overflow(), that.overflow());
846    }
847
848    @Override
849    public boolean equals(Object that) {
850      if (that == null || !(that instanceof BlockBucket)) {
851        return false;
852      }
853      return compareTo((BlockBucket)that) == 0;
854    }
855
856    @Override
857    public int hashCode() {
858      return Objects.hashCode(name, bucketSize, queue, totalSize);
859    }
860
861    @Override
862    public String toString() {
863      return MoreObjects.toStringHelper(this)
864        .add("name", name)
865        .add("totalSize", StringUtils.byteDesc(totalSize))
866        .add("bucketSize", StringUtils.byteDesc(bucketSize))
867        .toString();
868    }
869  }
870
871  /**
872   * Get the maximum size of this cache.
873   *
874   * @return max size in bytes
875   */
876
877  @Override
878  public long getMaxSize() {
879    return this.maxSize;
880  }
881
882  @Override
883  public long getCurrentSize() {
884    return this.size.get();
885  }
886
887  @Override
888  public long getCurrentDataSize() {
889    return this.dataBlockSize.sum();
890  }
891
892  @Override
893  public long getFreeSize() {
894    return getMaxSize() - getCurrentSize();
895  }
896
897  @Override
898  public long size() {
899    return getMaxSize();
900  }
901
902  @Override
903  public long getBlockCount() {
904    return this.elements.get();
905  }
906
907  @Override
908  public long getDataBlockCount() {
909    return this.dataBlockElements.sum();
910  }
911
912  EvictionThread getEvictionThread() {
913    return this.evictionThread;
914  }
915
916  /*
917   * Eviction thread.  Sits in waiting state until an eviction is triggered
918   * when the cache size grows above the acceptable level.<p>
919   *
920   * Thread is triggered into action by {@link LruBlockCache#runEviction()}
921   */
922  static class EvictionThread extends Thread {
923
924    private WeakReference<LruBlockCache> cache;
925    private volatile boolean go = true;
926    // flag set after enter the run method, used for test
927    private boolean enteringRun = false;
928
929    public EvictionThread(LruBlockCache cache) {
930      super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread");
931      setDaemon(true);
932      this.cache = new WeakReference<>(cache);
933    }
934
935    @Override
936    public void run() {
937      enteringRun = true;
938      while (this.go) {
939        synchronized (this) {
940          try {
941            this.wait(1000 * 10/*Don't wait for ever*/);
942          } catch (InterruptedException e) {
943            LOG.warn("Interrupted eviction thread ", e);
944            Thread.currentThread().interrupt();
945          }
946        }
947        LruBlockCache cache = this.cache.get();
948        if (cache == null) break;
949        cache.evict();
950      }
951    }
952
953    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
954        justification="This is what we want")
955    public void evict() {
956      synchronized (this) {
957        this.notifyAll();
958      }
959    }
960
961    synchronized void shutdown() {
962      this.go = false;
963      this.notifyAll();
964    }
965
966    /**
967     * Used for the test.
968     */
969    boolean isEnteringRun() {
970      return this.enteringRun;
971    }
972  }
973
974  /*
975   * Statistics thread.  Periodically prints the cache statistics to the log.
976   */
977  static class StatisticsThread extends Thread {
978
979    private final LruBlockCache lru;
980
981    public StatisticsThread(LruBlockCache lru) {
982      super("LruBlockCacheStats");
983      setDaemon(true);
984      this.lru = lru;
985    }
986
987    @Override
988    public void run() {
989      lru.logStats();
990    }
991  }
992
993  public void logStats() {
994    // Log size
995    long totalSize = heapSize();
996    long freeSize = maxSize - totalSize;
997    LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
998        "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
999        "max=" + StringUtils.byteDesc(this.maxSize) + ", " +
1000        "blockCount=" + getBlockCount() + ", " +
1001        "accesses=" + stats.getRequestCount() + ", " +
1002        "hits=" + stats.getHitCount() + ", " +
1003        "hitRatio=" + (stats.getHitCount() == 0 ?
1004          "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
1005        "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
1006        "cachingHits=" + stats.getHitCachingCount() + ", " +
1007        "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ?
1008          "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
1009        "evictions=" + stats.getEvictionCount() + ", " +
1010        "evicted=" + stats.getEvictedCount() + ", " +
1011        "evictedPerRun=" + stats.evictedPerEviction());
1012  }
1013
1014  /**
1015   * Get counter statistics for this cache.
1016   *
1017   * <p>Includes: total accesses, hits, misses, evicted blocks, and runs
1018   * of the eviction processes.
1019   */
1020  @Override
1021  public CacheStats getStats() {
1022    return this.stats;
1023  }
1024
1025  public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
1026      (4 * Bytes.SIZEOF_LONG) + (11 * ClassSize.REFERENCE) +
1027      (6 * Bytes.SIZEOF_FLOAT) + (2 * Bytes.SIZEOF_BOOLEAN)
1028      + ClassSize.OBJECT);
1029
1030  @Override
1031  public long heapSize() {
1032    return getCurrentSize();
1033  }
1034
1035  private static long calculateOverhead(long maxSize, long blockSize, int concurrency) {
1036    // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG
1037    return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP
1038           + ((long) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY)
1039           + ((long) concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
1040  }
1041
1042  @Override
1043  public Iterator<CachedBlock> iterator() {
1044    final Iterator<LruCachedBlock> iterator = map.values().iterator();
1045
1046    return new Iterator<CachedBlock>() {
1047      private final long now = System.nanoTime();
1048
1049      @Override
1050      public boolean hasNext() {
1051        return iterator.hasNext();
1052      }
1053
1054      @Override
1055      public CachedBlock next() {
1056        final LruCachedBlock b = iterator.next();
1057        return new CachedBlock() {
1058          @Override
1059          public String toString() {
1060            return BlockCacheUtil.toString(this, now);
1061          }
1062
1063          @Override
1064          public BlockPriority getBlockPriority() {
1065            return b.getPriority();
1066          }
1067
1068          @Override
1069          public BlockType getBlockType() {
1070            return b.getBuffer().getBlockType();
1071          }
1072
1073          @Override
1074          public long getOffset() {
1075            return b.getCacheKey().getOffset();
1076          }
1077
1078          @Override
1079          public long getSize() {
1080            return b.getBuffer().heapSize();
1081          }
1082
1083          @Override
1084          public long getCachedTime() {
1085            return b.getCachedTime();
1086          }
1087
1088          @Override
1089          public String getFilename() {
1090            return b.getCacheKey().getHfileName();
1091          }
1092
1093          @Override
1094          public int compareTo(CachedBlock other) {
1095            int diff = this.getFilename().compareTo(other.getFilename());
1096            if (diff != 0) return diff;
1097            diff = Long.compare(this.getOffset(), other.getOffset());
1098            if (diff != 0) return diff;
1099            if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
1100              throw new IllegalStateException(this.getCachedTime() + ", " + other.getCachedTime());
1101            }
1102            return Long.compare(other.getCachedTime(), this.getCachedTime());
1103          }
1104
1105          @Override
1106          public int hashCode() {
1107            return b.hashCode();
1108          }
1109
1110          @Override
1111          public boolean equals(Object obj) {
1112            if (obj instanceof CachedBlock) {
1113              CachedBlock cb = (CachedBlock)obj;
1114              return compareTo(cb) == 0;
1115            } else {
1116              return false;
1117            }
1118          }
1119        };
1120      }
1121
1122      @Override
1123      public void remove() {
1124        throw new UnsupportedOperationException();
1125      }
1126    };
1127  }
1128
1129  // Simple calculators of sizes given factors and maxSize
1130
1131  long acceptableSize() {
1132    return (long)Math.floor(this.maxSize * this.acceptableFactor);
1133  }
1134  private long minSize() {
1135    return (long)Math.floor(this.maxSize * this.minFactor);
1136  }
1137  private long singleSize() {
1138    return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor);
1139  }
1140  private long multiSize() {
1141    return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor);
1142  }
1143  private long memorySize() {
1144    return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
1145  }
1146
1147  @Override
1148  public void shutdown() {
1149    if (victimHandler != null) {
1150      victimHandler.shutdown();
1151    }
1152    this.scheduleThreadPool.shutdown();
1153    for (int i = 0; i < 10; i++) {
1154      if (!this.scheduleThreadPool.isShutdown()) {
1155        try {
1156          Thread.sleep(10);
1157        } catch (InterruptedException e) {
1158          LOG.warn("Interrupted while sleeping");
1159          Thread.currentThread().interrupt();
1160          break;
1161        }
1162      }
1163    }
1164
1165    if (!this.scheduleThreadPool.isShutdown()) {
1166      List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
1167      LOG.debug("Still running " + runnables);
1168    }
1169    this.evictionThread.shutdown();
1170  }
1171
1172  /** Clears the cache. Used in tests. */
1173  @VisibleForTesting
1174  public void clearCache() {
1175    this.map.clear();
1176    this.elements.set(0);
1177  }
1178
1179  /**
1180   * Used in testing. May be very inefficient.
1181   *
1182   * @return the set of cached file names
1183   */
1184  @VisibleForTesting
1185  SortedSet<String> getCachedFileNamesForTest() {
1186    SortedSet<String> fileNames = new TreeSet<>();
1187    for (BlockCacheKey cacheKey : map.keySet()) {
1188      fileNames.add(cacheKey.getHfileName());
1189    }
1190    return fileNames;
1191  }
1192
1193  @VisibleForTesting
1194  public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
1195    Map<DataBlockEncoding, Integer> counts = new EnumMap<>(DataBlockEncoding.class);
1196    for (LruCachedBlock block : map.values()) {
1197      DataBlockEncoding encoding = ((HFileBlock) block.getBuffer()).getDataBlockEncoding();
1198      Integer count = counts.get(encoding);
1199      counts.put(encoding, (count == null ? 0 : count) + 1);
1200    }
1201    return counts;
1202  }
1203
1204  @VisibleForTesting
1205  Map<BlockCacheKey, LruCachedBlock> getMapForTests() {
1206    return map;
1207  }
1208
1209  @Override
1210  public BlockCache[] getBlockCaches() {
1211    if (victimHandler != null) {
1212      return new BlockCache[] { this, this.victimHandler };
1213    }
1214    return null;
1215  }
1216}