001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.io.hfile;
020
021import java.lang.ref.WeakReference;
022import java.util.EnumMap;
023import java.util.Iterator;
024import java.util.List;
025import java.util.Map;
026import java.util.PriorityQueue;
027import java.util.SortedSet;
028import java.util.TreeSet;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.Executors;
031import java.util.concurrent.ScheduledExecutorService;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicLong;
034import java.util.concurrent.atomic.LongAdder;
035import java.util.concurrent.locks.ReentrantLock;
036
037import org.apache.hadoop.conf.Configuration;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041import org.apache.hadoop.hbase.io.HeapSize;
042import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.util.ClassSize;
045import org.apache.hadoop.hbase.util.HasThread;
046import org.apache.hadoop.util.StringUtils;
047
048import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
049import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
050import org.apache.hbase.thirdparty.com.google.common.base.Objects;
051import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
052
053import com.fasterxml.jackson.annotation.JsonIgnore;
054import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
055
056/**
057 * A block cache implementation that is memory-aware using {@link HeapSize},
058 * memory-bound using an LRU eviction algorithm, and concurrent: backed by a
059 * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving
060 * constant-time {@link #cacheBlock} and {@link #getBlock} operations.<p>
061 *
062 * Contains three levels of block priority to allow for scan-resistance and in-memory families
063 * {@link org.apache.hadoop.hbase.HColumnDescriptor#setInMemory(boolean)} (An in-memory column
064 * family is a column family that should be served from memory if possible):
065 * single-access, multiple-accesses, and in-memory priority.
066 * A block is added with an in-memory priority flag if
067 * {@link org.apache.hadoop.hbase.HColumnDescriptor#isInMemory()}, otherwise a block becomes a
068 * single access priority the first time it is read into this block cache.  If a block is
069 * accessed again while in cache, it is marked as a multiple access priority block.  This
070 * delineation of blocks is used to prevent scans from thrashing the cache adding a
071 * least-frequently-used element to the eviction algorithm.<p>
072 *
073 * Each priority is given its own chunk of the total cache to ensure
074 * fairness during eviction.  Each priority will retain close to its maximum
075 * size, however, if any priority is not using its entire chunk the others
076 * are able to grow beyond their chunk size.<p>
077 *
078 * Instantiated at a minimum with the total size and average block size.
079 * All sizes are in bytes.  The block size is not especially important as this
080 * cache is fully dynamic in its sizing of blocks.  It is only used for
081 * pre-allocating data structures and in initial heap estimation of the map.<p>
082 *
083 * The detailed constructor defines the sizes for the three priorities (they
084 * should total to the <code>maximum size</code> defined).  It also sets the levels that
085 * trigger and control the eviction thread.<p>
086 *
087 * The <code>acceptable size</code> is the cache size level which triggers the eviction
088 * process to start.  It evicts enough blocks to get the size below the
089 * minimum size specified.<p>
090 *
091 * Eviction happens in a separate thread and involves a single full-scan
092 * of the map.  It determines how many bytes must be freed to reach the minimum
093 * size, and then while scanning determines the fewest least-recently-used
094 * blocks necessary from each of the three priorities (would be 3 times bytes
095 * to free).  It then uses the priority chunk sizes to evict fairly according
096 * to the relative sizes and usage.
097 */
098@InterfaceAudience.Private
099@JsonIgnoreProperties({"encodingCountsForTest"})
100public class LruBlockCache implements ResizableBlockCache, HeapSize {
101
102  private static final Logger LOG = LoggerFactory.getLogger(LruBlockCache.class);
103
104  /**
105   * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep
106   * evicting during an eviction run till the cache size is down to 80% of the total.
107   */
108  private static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";
109
110  /**
111   * Acceptable size of cache (no evictions if size < acceptable)
112   */
113  private static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME =
114      "hbase.lru.blockcache.acceptable.factor";
115
116  /**
117   * Hard capacity limit of cache, will reject any put if size > this * acceptable
118   */
119  static final String LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME =
120      "hbase.lru.blockcache.hard.capacity.limit.factor";
121  private static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME =
122      "hbase.lru.blockcache.single.percentage";
123  private static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME =
124      "hbase.lru.blockcache.multi.percentage";
125  private static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME =
126      "hbase.lru.blockcache.memory.percentage";
127
128  /**
129   * Configuration key to force data-block always (except in-memory are too much)
130   * cached in memory for in-memory hfile, unlike inMemory, which is a column-family
131   * configuration, inMemoryForceMode is a cluster-wide configuration
132   */
133  private static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME =
134      "hbase.lru.rs.inmemoryforcemode";
135
136  /* Default Configuration Parameters*/
137
138  /* Backing Concurrent Map Configuration */
139  static final float DEFAULT_LOAD_FACTOR = 0.75f;
140  static final int DEFAULT_CONCURRENCY_LEVEL = 16;
141
142  /* Eviction thresholds */
143  private static final float DEFAULT_MIN_FACTOR = 0.95f;
144  static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f;
145
146  /* Priority buckets */
147  private static final float DEFAULT_SINGLE_FACTOR = 0.25f;
148  private static final float DEFAULT_MULTI_FACTOR = 0.50f;
149  private static final float DEFAULT_MEMORY_FACTOR = 0.25f;
150
151  private static final float DEFAULT_HARD_CAPACITY_LIMIT_FACTOR = 1.2f;
152
153  private static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false;
154
155  /* Statistics thread */
156  private static final int STAT_THREAD_PERIOD = 60 * 5;
157  private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size";
158  private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
159
160  /** Concurrent map (the cache) */
161  private final Map<BlockCacheKey, LruCachedBlock> map;
162
163  /** Eviction lock (locked when eviction in process) */
164  private final ReentrantLock evictionLock = new ReentrantLock(true);
165  private final long maxBlockSize;
166
167  /** Volatile boolean to track if we are in an eviction process or not */
168  private volatile boolean evictionInProgress = false;
169
170  /** Eviction thread */
171  private final EvictionThread evictionThread;
172
173  /** Statistics thread schedule pool (for heavy debugging, could remove) */
174  private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
175    new ThreadFactoryBuilder().setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build());
176
177  /** Current size of cache */
178  private final AtomicLong size;
179
180  /** Current size of data blocks */
181  private final LongAdder dataBlockSize;
182
183  /** Current number of cached elements */
184  private final AtomicLong elements;
185
186  /** Current number of cached data block elements */
187  private final LongAdder dataBlockElements;
188
189  /** Cache access count (sequential ID) */
190  private final AtomicLong count;
191
192  /** hard capacity limit */
193  private float hardCapacityLimitFactor;
194
195  /** Cache statistics */
196  private final CacheStats stats;
197
198  /** Maximum allowable size of cache (block put if size > max, evict) */
199  private long maxSize;
200
201  /** Approximate block size */
202  private long blockSize;
203
204  /** Acceptable size of cache (no evictions if size < acceptable) */
205  private float acceptableFactor;
206
207  /** Minimum threshold of cache (when evicting, evict until size < min) */
208  private float minFactor;
209
210  /** Single access bucket size */
211  private float singleFactor;
212
213  /** Multiple access bucket size */
214  private float multiFactor;
215
216  /** In-memory bucket size */
217  private float memoryFactor;
218
219  /** Overhead of the structure itself */
220  private long overhead;
221
222  /** Whether in-memory hfile's data block has higher priority when evicting */
223  private boolean forceInMemory;
224
225  /**
226   * Where to send victims (blocks evicted/missing from the cache). This is used only when we use an
227   * external cache as L2.
228   * Note: See org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache
229   */
230  private BlockCache victimHandler = null;
231
232  /**
233   * Default constructor.  Specify maximum size and expected average block
234   * size (approximation is fine).
235   *
236   * <p>All other factors will be calculated based on defaults specified in
237   * this class.
238   *
239   * @param maxSize   maximum size of cache, in bytes
240   * @param blockSize approximate size of each block, in bytes
241   */
242  public LruBlockCache(long maxSize, long blockSize) {
243    this(maxSize, blockSize, true);
244  }
245
246  /**
247   * Constructor used for testing.  Allows disabling of the eviction thread.
248   */
249  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
250    this(maxSize, blockSize, evictionThread,
251        (int) Math.ceil(1.2 * maxSize / blockSize),
252        DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
253        DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
254        DEFAULT_SINGLE_FACTOR,
255        DEFAULT_MULTI_FACTOR,
256        DEFAULT_MEMORY_FACTOR,
257        DEFAULT_HARD_CAPACITY_LIMIT_FACTOR,
258        false,
259        DEFAULT_MAX_BLOCK_SIZE
260        );
261  }
262
263  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
264    this(maxSize, blockSize, evictionThread,
265        (int) Math.ceil(1.2 * maxSize / blockSize),
266        DEFAULT_LOAD_FACTOR,
267        DEFAULT_CONCURRENCY_LEVEL,
268        conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
269        conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR),
270        conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR),
271        conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR),
272        conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR),
273        conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME,
274                      DEFAULT_HARD_CAPACITY_LIMIT_FACTOR),
275        conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE),
276        conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE)
277    );
278  }
279
280  public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
281    this(maxSize, blockSize, true, conf);
282  }
283
284  /**
285   * Configurable constructor.  Use this constructor if not using defaults.
286   *
287   * @param maxSize             maximum size of this cache, in bytes
288   * @param blockSize           expected average size of blocks, in bytes
289   * @param evictionThread      whether to run evictions in a bg thread or not
290   * @param mapInitialSize      initial size of backing ConcurrentHashMap
291   * @param mapLoadFactor       initial load factor of backing ConcurrentHashMap
292   * @param mapConcurrencyLevel initial concurrency factor for backing CHM
293   * @param minFactor           percentage of total size that eviction will evict until
294   * @param acceptableFactor    percentage of total size that triggers eviction
295   * @param singleFactor        percentage of total size for single-access blocks
296   * @param multiFactor         percentage of total size for multiple-access blocks
297   * @param memoryFactor        percentage of total size for in-memory blocks
298   */
299  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
300      int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
301      float minFactor, float acceptableFactor, float singleFactor,
302      float multiFactor, float memoryFactor, float hardLimitFactor,
303      boolean forceInMemory, long maxBlockSize) {
304    this.maxBlockSize = maxBlockSize;
305    if(singleFactor + multiFactor + memoryFactor != 1 ||
306        singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) {
307      throw new IllegalArgumentException("Single, multi, and memory factors " +
308          " should be non-negative and total 1.0");
309    }
310    if (minFactor >= acceptableFactor) {
311      throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
312    }
313    if (minFactor >= 1.0f || acceptableFactor >= 1.0f) {
314      throw new IllegalArgumentException("all factors must be < 1");
315    }
316    this.maxSize = maxSize;
317    this.blockSize = blockSize;
318    this.forceInMemory = forceInMemory;
319    map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel);
320    this.minFactor = minFactor;
321    this.acceptableFactor = acceptableFactor;
322    this.singleFactor = singleFactor;
323    this.multiFactor = multiFactor;
324    this.memoryFactor = memoryFactor;
325    this.stats = new CacheStats(this.getClass().getSimpleName());
326    this.count = new AtomicLong(0);
327    this.elements = new AtomicLong(0);
328    this.dataBlockElements = new LongAdder();
329    this.dataBlockSize = new LongAdder();
330    this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
331    this.size = new AtomicLong(this.overhead);
332    this.hardCapacityLimitFactor = hardLimitFactor;
333    if (evictionThread) {
334      this.evictionThread = new EvictionThread(this);
335      this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
336    } else {
337      this.evictionThread = null;
338    }
339    // TODO: Add means of turning this off.  Bit obnoxious running thread just to make a log
340    // every five minutes.
341    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD,
342                                                STAT_THREAD_PERIOD, TimeUnit.SECONDS);
343  }
344
345  @Override
346  public void setMaxSize(long maxSize) {
347    this.maxSize = maxSize;
348    if (this.size.get() > acceptableSize() && !evictionInProgress) {
349      runEviction();
350    }
351  }
352
353  // BlockCache implementation
354
355  /**
356   * Cache the block with the specified name and buffer.
357   * <p>
358   * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)
359   * this can happen, for which we compare the buffer contents.
360   *
361   * @param cacheKey block's cache key
362   * @param buf      block buffer
363   * @param inMemory if block is in-memory
364   */
365  @Override
366  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
367    if (buf.heapSize() > maxBlockSize) {
368      // If there are a lot of blocks that are too
369      // big this can make the logs way too noisy.
370      // So we log 2%
371      if (stats.failInsert() % 50 == 0) {
372        LOG.warn("Trying to cache too large a block "
373            + cacheKey.getHfileName() + " @ "
374            + cacheKey.getOffset()
375            + " is " + buf.heapSize()
376            + " which is larger than " + maxBlockSize);
377      }
378      return;
379    }
380
381    LruCachedBlock cb = map.get(cacheKey);
382    if (cb != null && !BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, buf)) {
383      return;
384    }
385    long currentSize = size.get();
386    long currentAcceptableSize = acceptableSize();
387    long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize);
388    if (currentSize >= hardLimitSize) {
389      stats.failInsert();
390      if (LOG.isTraceEnabled()) {
391        LOG.trace("LruBlockCache current size " + StringUtils.byteDesc(currentSize)
392          + " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + "."
393          + " The hard limit size is " + StringUtils.byteDesc(hardLimitSize)
394          + ", failed to put cacheKey:" + cacheKey + " into LruBlockCache.");
395      }
396      if (!evictionInProgress) {
397        runEviction();
398      }
399      return;
400    }
401    cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
402    long newSize = updateSizeMetrics(cb, false);
403    map.put(cacheKey, cb);
404    long val = elements.incrementAndGet();
405    if (buf.getBlockType().isData()) {
406       dataBlockElements.increment();
407    }
408    if (LOG.isTraceEnabled()) {
409      long size = map.size();
410      assertCounterSanity(size, val);
411    }
412    if (newSize > currentAcceptableSize && !evictionInProgress) {
413      runEviction();
414    }
415  }
416
417  /**
418   * Sanity-checking for parity between actual block cache content and metrics.
419   * Intended only for use with TRACE level logging and -ea JVM.
420   */
421  private static void assertCounterSanity(long mapSize, long counterVal) {
422    if (counterVal < 0) {
423      LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal +
424        ", mapSize=" + mapSize);
425      return;
426    }
427    if (mapSize < Integer.MAX_VALUE) {
428      double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.);
429      if (pct_diff > 0.05) {
430        LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal +
431          ", mapSize=" + mapSize);
432      }
433    }
434  }
435
436  /**
437   * Cache the block with the specified name and buffer.
438   * <p>
439   *
440   * @param cacheKey block's cache key
441   * @param buf      block buffer
442   */
443  @Override
444  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
445    cacheBlock(cacheKey, buf, false);
446  }
447
448  /**
449   * Helper function that updates the local size counter and also updates any
450   * per-cf or per-blocktype metrics it can discern from given
451   * {@link LruCachedBlock}
452   */
453  private long updateSizeMetrics(LruCachedBlock cb, boolean evict) {
454    long heapsize = cb.heapSize();
455    BlockType bt = cb.getBuffer().getBlockType();
456    if (evict) {
457      heapsize *= -1;
458    }
459    if (bt != null && bt.isData()) {
460       dataBlockSize.add(heapsize);
461    }
462    return size.addAndGet(heapsize);
463  }
464
465  /**
466   * Get the buffer of the block with the specified name.
467   *
468   * @param cacheKey           block's cache key
469   * @param caching            true if the caller caches blocks on cache misses
470   * @param repeat             Whether this is a repeat lookup for the same block
471   *                           (used to avoid double counting cache misses when doing double-check
472   *                           locking)
473   * @param updateCacheMetrics Whether to update cache metrics or not
474   *
475   * @return buffer of specified cache key, or null if not in cache
476   */
477  @Override
478  public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
479      boolean updateCacheMetrics) {
480    LruCachedBlock cb = map.get(cacheKey);
481    if (cb == null) {
482      if (!repeat && updateCacheMetrics) {
483        stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
484      }
485      // If there is another block cache then try and read there.
486      // However if this is a retry ( second time in double checked locking )
487      // And it's already a miss then the l2 will also be a miss.
488      if (victimHandler != null && !repeat) {
489        Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
490
491        // Promote this to L1.
492        if (result != null && caching) {
493          if (result instanceof HFileBlock && ((HFileBlock) result).usesSharedMemory()) {
494            result = ((HFileBlock) result).deepClone();
495          }
496          cacheBlock(cacheKey, result, /* inMemory = */ false);
497        }
498        return result;
499      }
500      return null;
501    }
502    if (updateCacheMetrics) stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
503    cb.access(count.incrementAndGet());
504    return cb.getBuffer();
505  }
506
507  /**
508   * Whether the cache contains block with specified cacheKey
509   *
510   * @return true if contains the block
511   */
512  public boolean containsBlock(BlockCacheKey cacheKey) {
513    return map.containsKey(cacheKey);
514  }
515
516  @Override
517  public boolean evictBlock(BlockCacheKey cacheKey) {
518    LruCachedBlock cb = map.get(cacheKey);
519    return cb != null && evictBlock(cb, false) > 0;
520  }
521
522  /**
523   * Evicts all blocks for a specific HFile. This is an
524   * expensive operation implemented as a linear-time search through all blocks
525   * in the cache. Ideally this should be a search in a log-access-time map.
526   *
527   * <p>
528   * This is used for evict-on-close to remove all blocks of a specific HFile.
529   *
530   * @return the number of blocks evicted
531   */
532  @Override
533  public int evictBlocksByHfileName(String hfileName) {
534    int numEvicted = 0;
535    for (BlockCacheKey key : map.keySet()) {
536      if (key.getHfileName().equals(hfileName)) {
537        if (evictBlock(key))
538          ++numEvicted;
539      }
540    }
541    if (victimHandler != null) {
542      numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
543    }
544    return numEvicted;
545  }
546
547  /**
548   * Evict the block, and it will be cached by the victim handler if exists &amp;&amp;
549   * block may be read again later
550   *
551   * @param evictedByEvictionProcess true if the given block is evicted by
552   *          EvictionThread
553   * @return the heap size of evicted block
554   */
555  protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) {
556    boolean found = map.remove(block.getCacheKey()) != null;
557    if (!found) {
558      return 0;
559    }
560    updateSizeMetrics(block, true);
561    long val = elements.decrementAndGet();
562    if (LOG.isTraceEnabled()) {
563      long size = map.size();
564      assertCounterSanity(size, val);
565    }
566    if (block.getBuffer().getBlockType().isData()) {
567       dataBlockElements.decrement();
568    }
569    if (evictedByEvictionProcess) {
570      // When the eviction of the block happened because of invalidation of HFiles, no need to
571      // update the stats counter.
572      stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary());
573      if (victimHandler != null) {
574        victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer());
575      }
576    }
577    return block.heapSize();
578  }
579
580  /**
581   * Multi-threaded call to run the eviction process.
582   */
583  private void runEviction() {
584    if (evictionThread == null) {
585      evict();
586    } else {
587      evictionThread.evict();
588    }
589  }
590
591  @VisibleForTesting
592  boolean isEvictionInProgress() {
593    return evictionInProgress;
594  }
595
596  @VisibleForTesting
597  long getOverhead() {
598    return overhead;
599  }
600
601  /**
602   * Eviction method.
603   */
604  void evict() {
605
606    // Ensure only one eviction at a time
607    if(!evictionLock.tryLock()) return;
608
609    try {
610      evictionInProgress = true;
611      long currentSize = this.size.get();
612      long bytesToFree = currentSize - minSize();
613
614      if (LOG.isTraceEnabled()) {
615        LOG.trace("Block cache LRU eviction started; Attempting to free " +
616          StringUtils.byteDesc(bytesToFree) + " of total=" +
617          StringUtils.byteDesc(currentSize));
618      }
619
620      if (bytesToFree <= 0) return;
621
622      // Instantiate priority buckets
623      BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize, singleSize());
624      BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize, multiSize());
625      BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize, memorySize());
626
627      // Scan entire map putting into appropriate buckets
628      for (LruCachedBlock cachedBlock : map.values()) {
629        switch (cachedBlock.getPriority()) {
630          case SINGLE: {
631            bucketSingle.add(cachedBlock);
632            break;
633          }
634          case MULTI: {
635            bucketMulti.add(cachedBlock);
636            break;
637          }
638          case MEMORY: {
639            bucketMemory.add(cachedBlock);
640            break;
641          }
642        }
643      }
644
645      long bytesFreed = 0;
646      if (forceInMemory || memoryFactor > 0.999f) {
647        long s = bucketSingle.totalSize();
648        long m = bucketMulti.totalSize();
649        if (bytesToFree > (s + m)) {
650          // this means we need to evict blocks in memory bucket to make room,
651          // so the single and multi buckets will be emptied
652          bytesFreed = bucketSingle.free(s);
653          bytesFreed += bucketMulti.free(m);
654          if (LOG.isTraceEnabled()) {
655            LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
656              " from single and multi buckets");
657          }
658          bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
659          if (LOG.isTraceEnabled()) {
660            LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
661              " total from all three buckets ");
662          }
663        } else {
664          // this means no need to evict block in memory bucket,
665          // and we try best to make the ratio between single-bucket and
666          // multi-bucket is 1:2
667          long bytesRemain = s + m - bytesToFree;
668          if (3 * s <= bytesRemain) {
669            // single-bucket is small enough that no eviction happens for it
670            // hence all eviction goes from multi-bucket
671            bytesFreed = bucketMulti.free(bytesToFree);
672          } else if (3 * m <= 2 * bytesRemain) {
673            // multi-bucket is small enough that no eviction happens for it
674            // hence all eviction goes from single-bucket
675            bytesFreed = bucketSingle.free(bytesToFree);
676          } else {
677            // both buckets need to evict some blocks
678            bytesFreed = bucketSingle.free(s - bytesRemain / 3);
679            if (bytesFreed < bytesToFree) {
680              bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
681            }
682          }
683        }
684      } else {
685        PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3);
686
687        bucketQueue.add(bucketSingle);
688        bucketQueue.add(bucketMulti);
689        bucketQueue.add(bucketMemory);
690
691        int remainingBuckets = bucketQueue.size();
692
693        BlockBucket bucket;
694        while ((bucket = bucketQueue.poll()) != null) {
695          long overflow = bucket.overflow();
696          if (overflow > 0) {
697            long bucketBytesToFree =
698                Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets);
699            bytesFreed += bucket.free(bucketBytesToFree);
700          }
701          remainingBuckets--;
702        }
703      }
704      if (LOG.isTraceEnabled()) {
705        long single = bucketSingle.totalSize();
706        long multi = bucketMulti.totalSize();
707        long memory = bucketMemory.totalSize();
708        LOG.trace("Block cache LRU eviction completed; " +
709          "freed=" + StringUtils.byteDesc(bytesFreed) + ", " +
710          "total=" + StringUtils.byteDesc(this.size.get()) + ", " +
711          "single=" + StringUtils.byteDesc(single) + ", " +
712          "multi=" + StringUtils.byteDesc(multi) + ", " +
713          "memory=" + StringUtils.byteDesc(memory));
714      }
715    } finally {
716      stats.evict();
717      evictionInProgress = false;
718      evictionLock.unlock();
719    }
720  }
721
722  @Override
723  public String toString() {
724    return MoreObjects.toStringHelper(this)
725      .add("blockCount", getBlockCount())
726      .add("currentSize", StringUtils.byteDesc(getCurrentSize()))
727      .add("freeSize", StringUtils.byteDesc(getFreeSize()))
728      .add("maxSize", StringUtils.byteDesc(getMaxSize()))
729      .add("heapSize", StringUtils.byteDesc(heapSize()))
730      .add("minSize", StringUtils.byteDesc(minSize()))
731      .add("minFactor", minFactor)
732      .add("multiSize", StringUtils.byteDesc(multiSize()))
733      .add("multiFactor", multiFactor)
734      .add("singleSize", StringUtils.byteDesc(singleSize()))
735      .add("singleFactor", singleFactor)
736      .toString();
737  }
738
739  /**
740   * Used to group blocks into priority buckets.  There will be a BlockBucket
741   * for each priority (single, multi, memory).  Once bucketed, the eviction
742   * algorithm takes the appropriate number of elements out of each according
743   * to configuration parameters and their relatives sizes.
744   */
745  private class BlockBucket implements Comparable<BlockBucket> {
746
747    private final String name;
748    private LruCachedBlockQueue queue;
749    private long totalSize = 0;
750    private long bucketSize;
751
752    public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) {
753      this.name = name;
754      this.bucketSize = bucketSize;
755      queue = new LruCachedBlockQueue(bytesToFree, blockSize);
756      totalSize = 0;
757    }
758
759    public void add(LruCachedBlock block) {
760      totalSize += block.heapSize();
761      queue.add(block);
762    }
763
764    public long free(long toFree) {
765      if (LOG.isTraceEnabled()) {
766        LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this);
767      }
768      LruCachedBlock cb;
769      long freedBytes = 0;
770      while ((cb = queue.pollLast()) != null) {
771        freedBytes += evictBlock(cb, true);
772        if (freedBytes >= toFree) {
773          return freedBytes;
774        }
775      }
776      if (LOG.isTraceEnabled()) {
777        LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this);
778      }
779      return freedBytes;
780    }
781
782    public long overflow() {
783      return totalSize - bucketSize;
784    }
785
786    public long totalSize() {
787      return totalSize;
788    }
789
790    @Override
791    public int compareTo(BlockBucket that) {
792      return Long.compare(this.overflow(), that.overflow());
793    }
794
795    @Override
796    public boolean equals(Object that) {
797      if (that == null || !(that instanceof BlockBucket)) {
798        return false;
799      }
800      return compareTo((BlockBucket)that) == 0;
801    }
802
803    @Override
804    public int hashCode() {
805      return Objects.hashCode(name, bucketSize, queue, totalSize);
806    }
807
808    @Override
809    public String toString() {
810      return MoreObjects.toStringHelper(this)
811        .add("name", name)
812        .add("totalSize", StringUtils.byteDesc(totalSize))
813        .add("bucketSize", StringUtils.byteDesc(bucketSize))
814        .toString();
815    }
816  }
817
818  /**
819   * Get the maximum size of this cache.
820   *
821   * @return max size in bytes
822   */
823
824  @Override
825  public long getMaxSize() {
826    return this.maxSize;
827  }
828
829  @Override
830  public long getCurrentSize() {
831    return this.size.get();
832  }
833
834  @Override
835  public long getCurrentDataSize() {
836    return this.dataBlockSize.sum();
837  }
838
839  @Override
840  public long getFreeSize() {
841    return getMaxSize() - getCurrentSize();
842  }
843
844  @Override
845  public long size() {
846    return getMaxSize();
847  }
848
849  @Override
850  public long getBlockCount() {
851    return this.elements.get();
852  }
853
854  @Override
855  public long getDataBlockCount() {
856    return this.dataBlockElements.sum();
857  }
858
859  EvictionThread getEvictionThread() {
860    return this.evictionThread;
861  }
862
863  /*
864   * Eviction thread.  Sits in waiting state until an eviction is triggered
865   * when the cache size grows above the acceptable level.<p>
866   *
867   * Thread is triggered into action by {@link LruBlockCache#runEviction()}
868   */
869  static class EvictionThread extends HasThread {
870
871    private WeakReference<LruBlockCache> cache;
872    private volatile boolean go = true;
873    // flag set after enter the run method, used for test
874    private boolean enteringRun = false;
875
876    public EvictionThread(LruBlockCache cache) {
877      super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread");
878      setDaemon(true);
879      this.cache = new WeakReference<>(cache);
880    }
881
882    @Override
883    public void run() {
884      enteringRun = true;
885      while (this.go) {
886        synchronized (this) {
887          try {
888            this.wait(1000 * 10/*Don't wait for ever*/);
889          } catch (InterruptedException e) {
890            LOG.warn("Interrupted eviction thread ", e);
891            Thread.currentThread().interrupt();
892          }
893        }
894        LruBlockCache cache = this.cache.get();
895        if (cache == null) break;
896        cache.evict();
897      }
898    }
899
900    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
901        justification="This is what we want")
902    public void evict() {
903      synchronized (this) {
904        this.notifyAll();
905      }
906    }
907
908    synchronized void shutdown() {
909      this.go = false;
910      this.notifyAll();
911    }
912
913    /**
914     * Used for the test.
915     */
916    boolean isEnteringRun() {
917      return this.enteringRun;
918    }
919  }
920
921  /*
922   * Statistics thread.  Periodically prints the cache statistics to the log.
923   */
924  static class StatisticsThread extends Thread {
925
926    private final LruBlockCache lru;
927
928    public StatisticsThread(LruBlockCache lru) {
929      super("LruBlockCacheStats");
930      setDaemon(true);
931      this.lru = lru;
932    }
933
934    @Override
935    public void run() {
936      lru.logStats();
937    }
938  }
939
940  public void logStats() {
941    // Log size
942    long totalSize = heapSize();
943    long freeSize = maxSize - totalSize;
944    LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
945        "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
946        "max=" + StringUtils.byteDesc(this.maxSize) + ", " +
947        "blockCount=" + getBlockCount() + ", " +
948        "accesses=" + stats.getRequestCount() + ", " +
949        "hits=" + stats.getHitCount() + ", " +
950        "hitRatio=" + (stats.getHitCount() == 0 ?
951          "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
952        "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
953        "cachingHits=" + stats.getHitCachingCount() + ", " +
954        "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ?
955          "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
956        "evictions=" + stats.getEvictionCount() + ", " +
957        "evicted=" + stats.getEvictedCount() + ", " +
958        "evictedPerRun=" + stats.evictedPerEviction());
959  }
960
961  /**
962   * Get counter statistics for this cache.
963   *
964   * <p>Includes: total accesses, hits, misses, evicted blocks, and runs
965   * of the eviction processes.
966   */
967  @Override
968  public CacheStats getStats() {
969    return this.stats;
970  }
971
972  public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
973      (4 * Bytes.SIZEOF_LONG) + (11 * ClassSize.REFERENCE) +
974      (6 * Bytes.SIZEOF_FLOAT) + (2 * Bytes.SIZEOF_BOOLEAN)
975      + ClassSize.OBJECT);
976
977  @Override
978  public long heapSize() {
979    return getCurrentSize();
980  }
981
982  private static long calculateOverhead(long maxSize, long blockSize, int concurrency) {
983    // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG
984    return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP
985           + ((long) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY)
986           + ((long) concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
987  }
988
989  @Override
990  public Iterator<CachedBlock> iterator() {
991    final Iterator<LruCachedBlock> iterator = map.values().iterator();
992
993    return new Iterator<CachedBlock>() {
994      private final long now = System.nanoTime();
995
996      @Override
997      public boolean hasNext() {
998        return iterator.hasNext();
999      }
1000
1001      @Override
1002      public CachedBlock next() {
1003        final LruCachedBlock b = iterator.next();
1004        return new CachedBlock() {
1005          @Override
1006          public String toString() {
1007            return BlockCacheUtil.toString(this, now);
1008          }
1009
1010          @Override
1011          public BlockPriority getBlockPriority() {
1012            return b.getPriority();
1013          }
1014
1015          @Override
1016          public BlockType getBlockType() {
1017            return b.getBuffer().getBlockType();
1018          }
1019
1020          @Override
1021          public long getOffset() {
1022            return b.getCacheKey().getOffset();
1023          }
1024
1025          @Override
1026          public long getSize() {
1027            return b.getBuffer().heapSize();
1028          }
1029
1030          @Override
1031          public long getCachedTime() {
1032            return b.getCachedTime();
1033          }
1034
1035          @Override
1036          public String getFilename() {
1037            return b.getCacheKey().getHfileName();
1038          }
1039
1040          @Override
1041          public int compareTo(CachedBlock other) {
1042            int diff = this.getFilename().compareTo(other.getFilename());
1043            if (diff != 0) return diff;
1044            diff = Long.compare(this.getOffset(), other.getOffset());
1045            if (diff != 0) return diff;
1046            if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
1047              throw new IllegalStateException(this.getCachedTime() + ", " + other.getCachedTime());
1048            }
1049            return Long.compare(other.getCachedTime(), this.getCachedTime());
1050          }
1051
1052          @Override
1053          public int hashCode() {
1054            return b.hashCode();
1055          }
1056
1057          @Override
1058          public boolean equals(Object obj) {
1059            if (obj instanceof CachedBlock) {
1060              CachedBlock cb = (CachedBlock)obj;
1061              return compareTo(cb) == 0;
1062            } else {
1063              return false;
1064            }
1065          }
1066        };
1067      }
1068
1069      @Override
1070      public void remove() {
1071        throw new UnsupportedOperationException();
1072      }
1073    };
1074  }
1075
1076  // Simple calculators of sizes given factors and maxSize
1077
1078  long acceptableSize() {
1079    return (long)Math.floor(this.maxSize * this.acceptableFactor);
1080  }
1081  private long minSize() {
1082    return (long)Math.floor(this.maxSize * this.minFactor);
1083  }
1084  private long singleSize() {
1085    return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor);
1086  }
1087  private long multiSize() {
1088    return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor);
1089  }
1090  private long memorySize() {
1091    return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
1092  }
1093
1094  @Override
1095  public void shutdown() {
1096    if (victimHandler != null) {
1097      victimHandler.shutdown();
1098    }
1099    this.scheduleThreadPool.shutdown();
1100    for (int i = 0; i < 10; i++) {
1101      if (!this.scheduleThreadPool.isShutdown()) {
1102        try {
1103          Thread.sleep(10);
1104        } catch (InterruptedException e) {
1105          LOG.warn("Interrupted while sleeping");
1106          Thread.currentThread().interrupt();
1107          break;
1108        }
1109      }
1110    }
1111
1112    if (!this.scheduleThreadPool.isShutdown()) {
1113      List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
1114      LOG.debug("Still running " + runnables);
1115    }
1116    this.evictionThread.shutdown();
1117  }
1118
1119  /** Clears the cache. Used in tests. */
1120  @VisibleForTesting
1121  public void clearCache() {
1122    this.map.clear();
1123    this.elements.set(0);
1124  }
1125
1126  /**
1127   * Used in testing. May be very inefficient.
1128   *
1129   * @return the set of cached file names
1130   */
1131  @VisibleForTesting
1132  SortedSet<String> getCachedFileNamesForTest() {
1133    SortedSet<String> fileNames = new TreeSet<>();
1134    for (BlockCacheKey cacheKey : map.keySet()) {
1135      fileNames.add(cacheKey.getHfileName());
1136    }
1137    return fileNames;
1138  }
1139
1140  @VisibleForTesting
1141  Map<BlockType, Integer> getBlockTypeCountsForTest() {
1142    Map<BlockType, Integer> counts = new EnumMap<>(BlockType.class);
1143    for (LruCachedBlock cb : map.values()) {
1144      BlockType blockType = cb.getBuffer().getBlockType();
1145      Integer count = counts.get(blockType);
1146      counts.put(blockType, (count == null ? 0 : count) + 1);
1147    }
1148    return counts;
1149  }
1150
1151  @VisibleForTesting
1152  public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
1153    Map<DataBlockEncoding, Integer> counts = new EnumMap<>(DataBlockEncoding.class);
1154    for (LruCachedBlock block : map.values()) {
1155      DataBlockEncoding encoding = ((HFileBlock) block.getBuffer()).getDataBlockEncoding();
1156      Integer count = counts.get(encoding);
1157      counts.put(encoding, (count == null ? 0 : count) + 1);
1158    }
1159    return counts;
1160  }
1161
1162  public void setVictimCache(BlockCache handler) {
1163    assert victimHandler == null;
1164    victimHandler = handler;
1165  }
1166
1167  @VisibleForTesting
1168  Map<BlockCacheKey, LruCachedBlock> getMapForTests() {
1169    return map;
1170  }
1171
1172  @Override
1173  @JsonIgnore
1174  public BlockCache[] getBlockCaches() {
1175    if (victimHandler != null)
1176      return new BlockCache[] {this, this.victimHandler};
1177    return null;
1178  }
1179}