001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import java.io.File;
021import java.io.FileInputStream;
022import java.io.FileOutputStream;
023import java.io.IOException;
024import java.nio.ByteBuffer;
025import java.util.ArrayList;
026import java.util.Comparator;
027import java.util.HashSet;
028import java.util.Iterator;
029import java.util.List;
030import java.util.Map;
031import java.util.NavigableSet;
032import java.util.PriorityQueue;
033import java.util.Set;
034import java.util.concurrent.ArrayBlockingQueue;
035import java.util.concurrent.BlockingQueue;
036import java.util.concurrent.ConcurrentHashMap;
037import java.util.concurrent.ConcurrentMap;
038import java.util.concurrent.ConcurrentSkipListSet;
039import java.util.concurrent.Executors;
040import java.util.concurrent.ScheduledExecutorService;
041import java.util.concurrent.TimeUnit;
042import java.util.concurrent.atomic.AtomicBoolean;
043import java.util.concurrent.atomic.AtomicLong;
044import java.util.concurrent.atomic.LongAdder;
045import java.util.concurrent.locks.Lock;
046import java.util.concurrent.locks.ReentrantLock;
047import java.util.concurrent.locks.ReentrantReadWriteLock;
048import java.util.function.Consumer;
049import java.util.function.Function;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.hbase.HBaseConfiguration;
052import org.apache.hadoop.hbase.TableName;
053import org.apache.hadoop.hbase.client.Admin;
054import org.apache.hadoop.hbase.io.ByteBuffAllocator;
055import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
056import org.apache.hadoop.hbase.io.HeapSize;
057import org.apache.hadoop.hbase.io.hfile.BlockCache;
058import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
059import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
060import org.apache.hadoop.hbase.io.hfile.BlockPriority;
061import org.apache.hadoop.hbase.io.hfile.BlockType;
062import org.apache.hadoop.hbase.io.hfile.CacheStats;
063import org.apache.hadoop.hbase.io.hfile.Cacheable;
064import org.apache.hadoop.hbase.io.hfile.CachedBlock;
065import org.apache.hadoop.hbase.io.hfile.HFileBlock;
066import org.apache.hadoop.hbase.io.hfile.HFileContext;
067import org.apache.hadoop.hbase.nio.ByteBuff;
068import org.apache.hadoop.hbase.nio.RefCnt;
069import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
070import org.apache.hadoop.hbase.util.Bytes;
071import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
072import org.apache.hadoop.hbase.util.IdReadWriteLock;
073import org.apache.hadoop.hbase.util.IdReadWriteLockStrongRef;
074import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool;
075import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool.ReferenceType;
076import org.apache.hadoop.util.StringUtils;
077import org.apache.yetus.audience.InterfaceAudience;
078import org.slf4j.Logger;
079import org.slf4j.LoggerFactory;
080
081import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
082import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
083
084import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos;
085
086/**
087 * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses BucketCache#ramCache
088 * and BucketCache#backingMap in order to determine if a given element is in the cache. The bucket
089 * cache can use off-heap memory {@link ByteBufferIOEngine} or mmap
090 * {@link ExclusiveMemoryMmapIOEngine} or pmem {@link SharedMemoryMmapIOEngine} or local files
091 * {@link FileIOEngine} to store/read the block data.
092 * <p>
093 * Eviction is via a similar algorithm as used in
094 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache}
095 * <p>
096 * BucketCache can be used as mainly a block cache (see
097 * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with a BlockCache to
098 * decrease CMS GC and heap fragmentation.
099 * <p>
100 * It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store blocks) to
101 * enlarge cache space via a victim cache.
102 */
103@InterfaceAudience.Private
104public class BucketCache implements BlockCache, HeapSize {
105  private static final Logger LOG = LoggerFactory.getLogger(BucketCache.class);
106
107  /** Priority buckets config */
108  static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor";
109  static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor";
110  static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor";
111  static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor";
112  static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor";
113  static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor";
114
115  /** Use strong reference for offsetLock or not */
116  private static final String STRONG_REF_KEY = "hbase.bucketcache.offsetlock.usestrongref";
117  private static final boolean STRONG_REF_DEFAULT = false;
118
119  /** Priority buckets */
120  static final float DEFAULT_SINGLE_FACTOR = 0.25f;
121  static final float DEFAULT_MULTI_FACTOR = 0.50f;
122  static final float DEFAULT_MEMORY_FACTOR = 0.25f;
123  static final float DEFAULT_MIN_FACTOR = 0.85f;
124
125  private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f;
126  private static final float DEFAULT_ACCEPT_FACTOR = 0.95f;
127
128  // Number of blocks to clear for each of the bucket size that is full
129  private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2;
130
131  /** Statistics thread */
132  private static final int statThreadPeriod = 5 * 60;
133
134  final static int DEFAULT_WRITER_THREADS = 3;
135  final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;
136
137  // Store/read block data
138  transient final IOEngine ioEngine;
139
140  // Store the block in this map before writing it to cache
141  transient final RAMCache ramCache;
142  // In this map, store the block's meta data like offset, length
143  transient ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMap;
144
145  /**
146   * Flag if the cache is enabled or not... We shut it off if there are IO errors for some time, so
147   * that Bucket IO exceptions/errors don't bring down the HBase server.
148   */
149  private volatile boolean cacheEnabled;
150
151  /**
152   * A list of writer queues. We have a queue per {@link WriterThread} we have running. In other
153   * words, the work adding blocks to the BucketCache is divided up amongst the running
154   * WriterThreads. Its done by taking hash of the cache key modulo queue count. WriterThread when
155   * it runs takes whatever has been recently added and 'drains' the entries to the BucketCache. It
156   * then updates the ramCache and backingMap accordingly.
157   */
158  transient final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList<>();
159  transient final WriterThread[] writerThreads;
160
161  /** Volatile boolean to track if free space is in process or not */
162  private volatile boolean freeInProgress = false;
163  private transient final Lock freeSpaceLock = new ReentrantLock();
164
165  private final LongAdder realCacheSize = new LongAdder();
166  private final LongAdder heapSize = new LongAdder();
167  /** Current number of cached elements */
168  private final LongAdder blockNumber = new LongAdder();
169
170  /** Cache access count (sequential ID) */
171  private final AtomicLong accessCount = new AtomicLong();
172
173  private static final int DEFAULT_CACHE_WAIT_TIME = 50;
174
175  /**
176   * Used in tests. If this flag is false and the cache speed is very fast, bucket cache will skip
177   * some blocks when caching. If the flag is true, we will wait until blocks are flushed to
178   * IOEngine.
179   */
180  boolean wait_when_cache = false;
181
182  private final BucketCacheStats cacheStats = new BucketCacheStats();
183
184  private final String persistencePath;
185  private final long cacheCapacity;
186  /** Approximate block size */
187  private final long blockSize;
188
189  /** Duration of IO errors tolerated before we disable cache, 1 min as default */
190  private final int ioErrorsTolerationDuration;
191  // 1 min
192  public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000;
193
194  // Start time of first IO error when reading or writing IO Engine, it will be
195  // reset after a successful read/write.
196  private volatile long ioErrorStartTime = -1;
197
198  /**
199   * A ReentrantReadWriteLock to lock on a particular block identified by offset. The purpose of
200   * this is to avoid freeing the block which is being read.
201   * <p>
202   */
203  transient final IdReadWriteLock<Long> offsetLock;
204
205  private final NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<>((a, b) -> {
206    int nameComparison = a.getHfileName().compareTo(b.getHfileName());
207    if (nameComparison != 0) {
208      return nameComparison;
209    }
210    return Long.compare(a.getOffset(), b.getOffset());
211  });
212
213  /** Statistics thread schedule pool (for heavy debugging, could remove) */
214  private transient final ScheduledExecutorService scheduleThreadPool =
215    Executors.newScheduledThreadPool(1,
216      new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build());
217
218  // Allocate or free space for the block
219  private transient BucketAllocator bucketAllocator;
220
221  /** Acceptable size of cache (no evictions if size < acceptable) */
222  private float acceptableFactor;
223
224  /** Minimum threshold of cache (when evicting, evict until size < min) */
225  private float minFactor;
226
227  /**
228   * Free this floating point factor of extra blocks when evicting. For example free the number of
229   * blocks requested * (1 + extraFreeFactor)
230   */
231  private float extraFreeFactor;
232
233  /** Single access bucket size */
234  private float singleFactor;
235
236  /** Multiple access bucket size */
237  private float multiFactor;
238
239  /** In-memory bucket size */
240  private float memoryFactor;
241
242  private static final String FILE_VERIFY_ALGORITHM =
243    "hbase.bucketcache.persistent.file.integrity.check.algorithm";
244  private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
245
246  /**
247   * Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file
248   * integrity, default algorithm is MD5
249   */
250  private String algorithm;
251
252  /* Tracing failed Bucket Cache allocations. */
253  private long allocFailLogPrevTs; // time of previous log event for allocation failure.
254  private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000; // Default 1 minute.
255
256  public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
257    int writerThreadNum, int writerQLen, String persistencePath) throws IOException {
258    this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
259      persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create());
260  }
261
262  public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
263    int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration,
264    Configuration conf) throws IOException {
265    boolean useStrongRef = conf.getBoolean(STRONG_REF_KEY, STRONG_REF_DEFAULT);
266    if (useStrongRef) {
267      this.offsetLock = new IdReadWriteLockStrongRef<>();
268    } else {
269      this.offsetLock = new IdReadWriteLockWithObjectPool<>(ReferenceType.SOFT);
270    }
271    this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM);
272    this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath);
273    this.writerThreads = new WriterThread[writerThreadNum];
274    long blockNumCapacity = capacity / blockSize;
275    if (blockNumCapacity >= Integer.MAX_VALUE) {
276      // Enough for about 32TB of cache!
277      throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now");
278    }
279
280    this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR);
281    this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR);
282    this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR);
283    this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR);
284    this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR);
285    this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR);
286
287    sanityCheckConfigs();
288
289    LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor
290      + ", minFactor: " + minFactor + ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: "
291      + singleFactor + ", multiFactor: " + multiFactor + ", memoryFactor: " + memoryFactor
292      + ", useStrongRef: " + useStrongRef);
293
294    this.cacheCapacity = capacity;
295    this.persistencePath = persistencePath;
296    this.blockSize = blockSize;
297    this.ioErrorsTolerationDuration = ioErrorsTolerationDuration;
298
299    this.allocFailLogPrevTs = 0;
300
301    bucketAllocator = new BucketAllocator(capacity, bucketSizes);
302    for (int i = 0; i < writerThreads.length; ++i) {
303      writerQueues.add(new ArrayBlockingQueue<>(writerQLen));
304    }
305
306    assert writerQueues.size() == writerThreads.length;
307    this.ramCache = new RAMCache();
308
309    this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity);
310
311    if (ioEngine.isPersistent() && persistencePath != null) {
312      try {
313        retrieveFromFile(bucketSizes);
314      } catch (IOException ioex) {
315        LOG.error("Can't restore from file[" + persistencePath + "] because of ", ioex);
316      }
317    }
318    final String threadName = Thread.currentThread().getName();
319    this.cacheEnabled = true;
320    for (int i = 0; i < writerThreads.length; ++i) {
321      writerThreads[i] = new WriterThread(writerQueues.get(i));
322      writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
323      writerThreads[i].setDaemon(true);
324    }
325    startWriterThreads();
326
327    // Run the statistics thread periodically to print the cache statistics log
328    // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log
329    // every five minutes.
330    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod,
331      statThreadPeriod, TimeUnit.SECONDS);
332    LOG.info("Started bucket cache; ioengine=" + ioEngineName + ", capacity="
333      + StringUtils.byteDesc(capacity) + ", blockSize=" + StringUtils.byteDesc(blockSize)
334      + ", writerThreadNum=" + writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath="
335      + persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName());
336  }
337
338  private void sanityCheckConfigs() {
339    Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0,
340      ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
341    Preconditions.checkArgument(minFactor <= 1 && minFactor >= 0,
342      MIN_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
343    Preconditions.checkArgument(minFactor <= acceptableFactor,
344      MIN_FACTOR_CONFIG_NAME + " must be <= " + ACCEPT_FACTOR_CONFIG_NAME);
345    Preconditions.checkArgument(extraFreeFactor >= 0,
346      EXTRA_FREE_FACTOR_CONFIG_NAME + " must be greater than 0.0");
347    Preconditions.checkArgument(singleFactor <= 1 && singleFactor >= 0,
348      SINGLE_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
349    Preconditions.checkArgument(multiFactor <= 1 && multiFactor >= 0,
350      MULTI_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
351    Preconditions.checkArgument(memoryFactor <= 1 && memoryFactor >= 0,
352      MEMORY_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
353    Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1,
354      SINGLE_FACTOR_CONFIG_NAME + ", " + MULTI_FACTOR_CONFIG_NAME + ", and "
355        + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0");
356  }
357
358  /**
359   * Called by the constructor to start the writer threads. Used by tests that need to override
360   * starting the threads.
361   */
362  protected void startWriterThreads() {
363    for (WriterThread thread : writerThreads) {
364      thread.start();
365    }
366  }
367
368  boolean isCacheEnabled() {
369    return this.cacheEnabled;
370  }
371
372  @Override
373  public long getMaxSize() {
374    return this.cacheCapacity;
375  }
376
377  public String getIoEngine() {
378    return ioEngine.toString();
379  }
380
381  /**
382   * Get the IOEngine from the IO engine name nnn * @return the IOEngine n
383   */
384  private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath)
385    throws IOException {
386    if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) {
387      // In order to make the usage simple, we only need the prefix 'files:' in
388      // document whether one or multiple file(s), but also support 'file:' for
389      // the compatibility
390      String[] filePaths =
391        ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER);
392      return new FileIOEngine(capacity, persistencePath != null, filePaths);
393    } else if (ioEngineName.startsWith("offheap")) {
394      return new ByteBufferIOEngine(capacity);
395    } else if (ioEngineName.startsWith("mmap:")) {
396      return new ExclusiveMemoryMmapIOEngine(ioEngineName.substring(5), capacity);
397    } else if (ioEngineName.startsWith("pmem:")) {
398      // This mode of bucket cache creates an IOEngine over a file on the persistent memory
399      // device. Since the persistent memory device has its own address space the contents
400      // mapped to this address space does not get swapped out like in the case of mmapping
401      // on to DRAM. Hence the cells created out of the hfile blocks in the pmem bucket cache
402      // can be directly referred to without having to copy them onheap. Once the RPC is done,
403      // the blocks can be returned back as in case of ByteBufferIOEngine.
404      return new SharedMemoryMmapIOEngine(ioEngineName.substring(5), capacity);
405    } else {
406      throw new IllegalArgumentException(
407        "Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap");
408    }
409  }
410
411  /**
412   * Cache the block with the specified name and buffer.
413   * @param cacheKey block's cache key
414   * @param buf      block buffer
415   */
416  @Override
417  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
418    cacheBlock(cacheKey, buf, false);
419  }
420
421  /**
422   * Cache the block with the specified name and buffer.
423   * @param cacheKey   block's cache key
424   * @param cachedItem block buffer
425   * @param inMemory   if block is in-memory
426   */
427  @Override
428  public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) {
429    cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache);
430  }
431
432  /**
433   * Cache the block to ramCache
434   * @param cacheKey   block's cache key
435   * @param cachedItem block buffer
436   * @param inMemory   if block is in-memory
437   * @param wait       if true, blocking wait when queue is full
438   */
439  public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
440    boolean wait) {
441    if (cacheEnabled) {
442      if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) {
443        if (shouldReplaceExistingCacheBlock(cacheKey, cachedItem)) {
444          BucketEntry bucketEntry = backingMap.get(cacheKey);
445          if (bucketEntry != null && bucketEntry.isRpcRef()) {
446            // avoid replace when there are RPC refs for the bucket entry in bucket cache
447            return;
448          }
449          cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
450        }
451      } else {
452        cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
453      }
454    }
455  }
456
457  protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) {
458    return BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, newBlock);
459  }
460
461  protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
462    boolean inMemory, boolean wait) {
463    if (!cacheEnabled) {
464      return;
465    }
466    LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
467    // Stuff the entry into the RAM cache so it can get drained to the persistent store
468    RAMQueueEntry re =
469      new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
470    /**
471     * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same
472     * key in ramCache, the heap size of bucket cache need to update if replacing entry from
473     * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if
474     * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct
475     * one, then the heap size will mess up (HBASE-20789)
476     */
477    if (ramCache.putIfAbsent(cacheKey, re) != null) {
478      return;
479    }
480    int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
481    BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum);
482    boolean successfulAddition = false;
483    if (wait) {
484      try {
485        successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS);
486      } catch (InterruptedException e) {
487        Thread.currentThread().interrupt();
488      }
489    } else {
490      successfulAddition = bq.offer(re);
491    }
492    if (!successfulAddition) {
493      ramCache.remove(cacheKey);
494      cacheStats.failInsert();
495    } else {
496      this.blockNumber.increment();
497      this.heapSize.add(cachedItem.heapSize());
498    }
499  }
500
501  /**
502   * Get the buffer of the block with the specified key.
503   * @param key                block's cache key
504   * @param caching            true if the caller caches blocks on cache misses
505   * @param repeat             Whether this is a repeat lookup for the same block
506   * @param updateCacheMetrics Whether we should update cache metrics or not
507   * @return buffer of specified cache key, or null if not in cache
508   */
509  @Override
510  public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
511    boolean updateCacheMetrics) {
512    if (!cacheEnabled) {
513      return null;
514    }
515    RAMQueueEntry re = ramCache.get(key);
516    if (re != null) {
517      if (updateCacheMetrics) {
518        cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
519      }
520      re.access(accessCount.incrementAndGet());
521      return re.getData();
522    }
523    BucketEntry bucketEntry = backingMap.get(key);
524    if (bucketEntry != null) {
525      long start = System.nanoTime();
526      ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
527      try {
528        lock.readLock().lock();
529        // We can not read here even if backingMap does contain the given key because its offset
530        // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check
531        // existence here.
532        if (bucketEntry.equals(backingMap.get(key))) {
533          // Read the block from IOEngine based on the bucketEntry's offset and length, NOTICE: the
534          // block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to
535          // the same BucketEntry, then all of the three will share the same refCnt.
536          Cacheable cachedBlock = ioEngine.read(bucketEntry);
537          if (ioEngine.usesSharedMemory()) {
538            // If IOEngine use shared memory, cachedBlock and BucketEntry will share the
539            // same RefCnt, do retain here, in order to count the number of RPC references
540            cachedBlock.retain();
541          }
542          // Update the cache statistics.
543          if (updateCacheMetrics) {
544            cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
545            cacheStats.ioHit(System.nanoTime() - start);
546          }
547          bucketEntry.access(accessCount.incrementAndGet());
548          if (this.ioErrorStartTime > 0) {
549            ioErrorStartTime = -1;
550          }
551          return cachedBlock;
552        }
553      } catch (IOException ioex) {
554        LOG.error("Failed reading block " + key + " from bucket cache", ioex);
555        checkIOErrorIsTolerated();
556      } finally {
557        lock.readLock().unlock();
558      }
559    }
560    if (!repeat && updateCacheMetrics) {
561      cacheStats.miss(caching, key.isPrimary(), key.getBlockType());
562    }
563    return null;
564  }
565
566  /**
567   * This method is invoked after the bucketEntry is removed from {@link BucketCache#backingMap}
568   */
569  void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) {
570    bucketEntry.markAsEvicted();
571    blocksByHFile.remove(cacheKey);
572    if (decrementBlockNumber) {
573      this.blockNumber.decrement();
574    }
575    cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
576  }
577
578  /**
579   * Free the {{@link BucketEntry} actually,which could only be invoked when the
580   * {@link BucketEntry#refCnt} becoming 0.
581   */
582  void freeBucketEntry(BucketEntry bucketEntry) {
583    bucketAllocator.freeBlock(bucketEntry.offset());
584    realCacheSize.add(-1 * bucketEntry.getLength());
585  }
586
587  /**
588   * Try to evict the block from {@link BlockCache} by force. We'll call this in few cases:<br>
589   * 1. Close an HFile, and clear all cached blocks. <br>
590   * 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for a given table.<br>
591   * <p>
592   * Firstly, we'll try to remove the block from RAMCache,and then try to evict from backingMap.
593   * Here we evict the block from backingMap immediately, but only free the reference from bucket
594   * cache by calling {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this
595   * block, block can only be de-allocated when all of them release the block.
596   * <p>
597   * NOTICE: we need to grab the write offset lock firstly before releasing the reference from
598   * bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt = 0 when
599   * {@link BucketCache#getBlock(BlockCacheKey, boolean, boolean, boolean)}, it's a memory leak.
600   * @param cacheKey Block to evict
601   * @return true to indicate whether we've evicted successfully or not.
602   */
603  @Override
604  public boolean evictBlock(BlockCacheKey cacheKey) {
605    return doEvictBlock(cacheKey, null);
606  }
607
608  /**
609   * Evict the {@link BlockCacheKey} and {@link BucketEntry} from {@link BucketCache#backingMap} and
610   * {@link BucketCache#ramCache}. <br/>
611   * NOTE:When Evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and
612   * {@link BucketEntry} could be removed.
613   * @param cacheKey    {@link BlockCacheKey} to evict.
614   * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict.
615   * @return true to indicate whether we've evicted successfully or not.
616   */
617  private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry) {
618    if (!cacheEnabled) {
619      return false;
620    }
621    boolean existedInRamCache = removeFromRamCache(cacheKey);
622    if (bucketEntry == null) {
623      bucketEntry = backingMap.get(cacheKey);
624    }
625    final BucketEntry bucketEntryToUse = bucketEntry;
626
627    if (bucketEntryToUse == null) {
628      if (existedInRamCache) {
629        cacheStats.evicted(0, cacheKey.isPrimary());
630      }
631      return existedInRamCache;
632    } else {
633      return bucketEntryToUse.withWriteLock(offsetLock, () -> {
634        if (backingMap.remove(cacheKey, bucketEntryToUse)) {
635          blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache);
636          return true;
637        }
638        return false;
639      });
640    }
641  }
642
643  /**
644   * <pre>
645   * Create the {@link Recycler} for {@link BucketEntry#refCnt},which would be used as
646   * {@link RefCnt#recycler} of {@link HFileBlock#buf} returned from {@link BucketCache#getBlock}.
647   * NOTE: for {@link BucketCache#getBlock},the {@link RefCnt#recycler} of {@link HFileBlock#buf}
648   * from {@link BucketCache#backingMap} and {@link BucketCache#ramCache} are different:
649   * 1.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#backingMap},
650   *   it is the return value of current {@link BucketCache#createRecycler} method.
651   *
652   * 2.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#ramCache},
653   *   it is {@link ByteBuffAllocator#putbackBuffer}.
654   * </pre>
655   */
656  private Recycler createRecycler(final BucketEntry bucketEntry) {
657    return () -> {
658      freeBucketEntry(bucketEntry);
659      return;
660    };
661  }
662
663  /**
664   * NOTE: This method is only for test.
665   */
666  public boolean evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey) {
667    BucketEntry bucketEntry = backingMap.get(blockCacheKey);
668    if (bucketEntry == null) {
669      return false;
670    }
671    return evictBucketEntryIfNoRpcReferenced(blockCacheKey, bucketEntry);
672  }
673
674  /**
675   * Evict {@link BlockCacheKey} and its corresponding {@link BucketEntry} only if
676   * {@link BucketEntry#isRpcRef} is false. <br/>
677   * NOTE:When evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and
678   * {@link BucketEntry} could be removed.
679   * @param blockCacheKey {@link BlockCacheKey} to evict.
680   * @param bucketEntry   {@link BucketEntry} matched {@link BlockCacheKey} to evict.
681   * @return true to indicate whether we've evicted successfully or not.
682   */
683  boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, BucketEntry bucketEntry) {
684    if (!bucketEntry.isRpcRef()) {
685      return doEvictBlock(blockCacheKey, bucketEntry);
686    }
687    return false;
688  }
689
690  protected boolean removeFromRamCache(BlockCacheKey cacheKey) {
691    return ramCache.remove(cacheKey, re -> {
692      if (re != null) {
693        this.blockNumber.decrement();
694        this.heapSize.add(-1 * re.getData().heapSize());
695      }
696    });
697  }
698
699  /*
700   * Statistics thread. Periodically output cache statistics to the log.
701   */
702  private static class StatisticsThread extends Thread {
703    private final BucketCache bucketCache;
704
705    public StatisticsThread(BucketCache bucketCache) {
706      super("BucketCacheStatsThread");
707      setDaemon(true);
708      this.bucketCache = bucketCache;
709    }
710
711    @Override
712    public void run() {
713      bucketCache.logStats();
714    }
715  }
716
717  public void logStats() {
718    long totalSize = bucketAllocator.getTotalSize();
719    long usedSize = bucketAllocator.getUsedSize();
720    long freeSize = totalSize - usedSize;
721    long cacheSize = getRealCacheSize();
722    LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " + "totalSize="
723      + StringUtils.byteDesc(totalSize) + ", " + "freeSize=" + StringUtils.byteDesc(freeSize) + ", "
724      + "usedSize=" + StringUtils.byteDesc(usedSize) + ", " + "cacheSize="
725      + StringUtils.byteDesc(cacheSize) + ", " + "accesses=" + cacheStats.getRequestCount() + ", "
726      + "hits=" + cacheStats.getHitCount() + ", " + "IOhitsPerSecond="
727      + cacheStats.getIOHitsPerSecond() + ", " + "IOTimePerHit="
728      + String.format("%.2f", cacheStats.getIOTimePerHit()) + ", " + "hitRatio="
729      + (cacheStats.getHitCount() == 0
730        ? "0,"
731        : (StringUtils.formatPercent(cacheStats.getHitRatio(), 2) + ", "))
732      + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits="
733      + cacheStats.getHitCachingCount() + ", " + "cachingHitsRatio="
734      + (cacheStats.getHitCachingCount() == 0
735        ? "0,"
736        : (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2) + ", "))
737      + "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted="
738      + cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + cacheStats.evictedPerEviction()
739      + ", " + "allocationFailCount=" + cacheStats.getAllocationFailCount());
740    cacheStats.reset();
741  }
742
743  public long getRealCacheSize() {
744    return this.realCacheSize.sum();
745  }
746
747  public long acceptableSize() {
748    return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor);
749  }
750
751  long getPartitionSize(float partitionFactor) {
752    return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor);
753  }
754
755  /**
756   * Return the count of bucketSizeinfos still need free space
757   */
758  private int bucketSizesAboveThresholdCount(float minFactor) {
759    BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
760    int fullCount = 0;
761    for (int i = 0; i < stats.length; i++) {
762      long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor));
763      freeGoal = Math.max(freeGoal, 1);
764      if (stats[i].freeCount() < freeGoal) {
765        fullCount++;
766      }
767    }
768    return fullCount;
769  }
770
771  /**
772   * This method will find the buckets that are minimally occupied and are not reference counted and
773   * will free them completely without any constraint on the access times of the elements, and as a
774   * process will completely free at most the number of buckets passed, sometimes it might not due
775   * to changing refCounts
776   * @param completelyFreeBucketsNeeded number of buckets to free
777   **/
778  private void freeEntireBuckets(int completelyFreeBucketsNeeded) {
779    if (completelyFreeBucketsNeeded != 0) {
780      // First we will build a set where the offsets are reference counted, usually
781      // this set is small around O(Handler Count) unless something else is wrong
782      Set<Integer> inUseBuckets = new HashSet<>();
783      backingMap.forEach((k, be) -> {
784        if (be.isRpcRef()) {
785          inUseBuckets.add(bucketAllocator.getBucketIndex(be.offset()));
786        }
787      });
788      Set<Integer> candidateBuckets =
789        bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded);
790      for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) {
791        if (candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset()))) {
792          evictBucketEntryIfNoRpcReferenced(entry.getKey(), entry.getValue());
793        }
794      }
795    }
796  }
797
798  /**
799   * Free the space if the used size reaches acceptableSize() or one size block couldn't be
800   * allocated. When freeing the space, we use the LRU algorithm and ensure there must be some
801   * blocks evicted
802   * @param why Why we are being called
803   */
804  private void freeSpace(final String why) {
805    // Ensure only one freeSpace progress at a time
806    if (!freeSpaceLock.tryLock()) {
807      return;
808    }
809    try {
810      freeInProgress = true;
811      long bytesToFreeWithoutExtra = 0;
812      // Calculate free byte for each bucketSizeinfo
813      StringBuilder msgBuffer = LOG.isDebugEnabled() ? new StringBuilder() : null;
814      BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
815      long[] bytesToFreeForBucket = new long[stats.length];
816      for (int i = 0; i < stats.length; i++) {
817        bytesToFreeForBucket[i] = 0;
818        long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor));
819        freeGoal = Math.max(freeGoal, 1);
820        if (stats[i].freeCount() < freeGoal) {
821          bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount());
822          bytesToFreeWithoutExtra += bytesToFreeForBucket[i];
823          if (msgBuffer != null) {
824            msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")="
825              + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", ");
826          }
827        }
828      }
829      if (msgBuffer != null) {
830        msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", ");
831      }
832
833      if (bytesToFreeWithoutExtra <= 0) {
834        return;
835      }
836      long currentSize = bucketAllocator.getUsedSize();
837      long totalSize = bucketAllocator.getTotalSize();
838      if (LOG.isDebugEnabled() && msgBuffer != null) {
839        LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString()
840          + " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize="
841          + StringUtils.byteDesc(realCacheSize.sum()) + ", total="
842          + StringUtils.byteDesc(totalSize));
843      }
844
845      long bytesToFreeWithExtra =
846        (long) Math.floor(bytesToFreeWithoutExtra * (1 + extraFreeFactor));
847
848      // Instantiate priority buckets
849      BucketEntryGroup bucketSingle =
850        new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(singleFactor));
851      BucketEntryGroup bucketMulti =
852        new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(multiFactor));
853      BucketEntryGroup bucketMemory =
854        new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(memoryFactor));
855
856      // Scan entire map putting bucket entry into appropriate bucket entry
857      // group
858      for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) {
859        switch (bucketEntryWithKey.getValue().getPriority()) {
860          case SINGLE: {
861            bucketSingle.add(bucketEntryWithKey);
862            break;
863          }
864          case MULTI: {
865            bucketMulti.add(bucketEntryWithKey);
866            break;
867          }
868          case MEMORY: {
869            bucketMemory.add(bucketEntryWithKey);
870            break;
871          }
872        }
873      }
874
875      PriorityQueue<BucketEntryGroup> bucketQueue =
876        new PriorityQueue<>(3, Comparator.comparingLong(BucketEntryGroup::overflow));
877
878      bucketQueue.add(bucketSingle);
879      bucketQueue.add(bucketMulti);
880      bucketQueue.add(bucketMemory);
881
882      int remainingBuckets = bucketQueue.size();
883      long bytesFreed = 0;
884
885      BucketEntryGroup bucketGroup;
886      while ((bucketGroup = bucketQueue.poll()) != null) {
887        long overflow = bucketGroup.overflow();
888        if (overflow > 0) {
889          long bucketBytesToFree =
890            Math.min(overflow, (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets);
891          bytesFreed += bucketGroup.free(bucketBytesToFree);
892        }
893        remainingBuckets--;
894      }
895
896      // Check and free if there are buckets that still need freeing of space
897      if (bucketSizesAboveThresholdCount(minFactor) > 0) {
898        bucketQueue.clear();
899        remainingBuckets = 3;
900
901        bucketQueue.add(bucketSingle);
902        bucketQueue.add(bucketMulti);
903        bucketQueue.add(bucketMemory);
904
905        while ((bucketGroup = bucketQueue.poll()) != null) {
906          long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets;
907          bytesFreed += bucketGroup.free(bucketBytesToFree);
908          remainingBuckets--;
909        }
910      }
911
912      // Even after the above free we might still need freeing because of the
913      // De-fragmentation of the buckets (also called Slab Calcification problem), i.e
914      // there might be some buckets where the occupancy is very sparse and thus are not
915      // yielding the free for the other bucket sizes, the fix for this to evict some
916      // of the buckets, we do this by evicting the buckets that are least fulled
917      freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR * bucketSizesAboveThresholdCount(1.0f));
918
919      if (LOG.isDebugEnabled()) {
920        long single = bucketSingle.totalSize();
921        long multi = bucketMulti.totalSize();
922        long memory = bucketMemory.totalSize();
923        if (LOG.isDebugEnabled()) {
924          LOG.debug("Bucket cache free space completed; " + "freed="
925            + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + StringUtils.byteDesc(totalSize)
926            + ", " + "single=" + StringUtils.byteDesc(single) + ", " + "multi="
927            + StringUtils.byteDesc(multi) + ", " + "memory=" + StringUtils.byteDesc(memory));
928        }
929      }
930
931    } catch (Throwable t) {
932      LOG.warn("Failed freeing space", t);
933    } finally {
934      cacheStats.evict();
935      freeInProgress = false;
936      freeSpaceLock.unlock();
937    }
938  }
939
940  // This handles flushing the RAM cache to IOEngine.
941  class WriterThread extends Thread {
942    private final BlockingQueue<RAMQueueEntry> inputQueue;
943    private volatile boolean writerEnabled = true;
944    private final ByteBuffer metaBuff = ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE);
945
946    WriterThread(BlockingQueue<RAMQueueEntry> queue) {
947      super("BucketCacheWriterThread");
948      this.inputQueue = queue;
949    }
950
951    // Used for test
952    void disableWriter() {
953      this.writerEnabled = false;
954    }
955
956    @Override
957    public void run() {
958      List<RAMQueueEntry> entries = new ArrayList<>();
959      try {
960        while (cacheEnabled && writerEnabled) {
961          try {
962            try {
963              // Blocks
964              entries = getRAMQueueEntries(inputQueue, entries);
965            } catch (InterruptedException ie) {
966              if (!cacheEnabled || !writerEnabled) {
967                break;
968              }
969            }
970            doDrain(entries, metaBuff);
971          } catch (Exception ioe) {
972            LOG.error("WriterThread encountered error", ioe);
973          }
974        }
975      } catch (Throwable t) {
976        LOG.warn("Failed doing drain", t);
977      }
978      LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
979    }
980  }
981
982  /**
983   * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing
984   * cache with a new block for the same cache key. there's a corner case: one thread cache a block
985   * in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another new block
986   * with the same cache key do the same thing for the same cache key, so if not evict the previous
987   * bucket entry, then memory leak happen because the previous bucketEntry is gone but the
988   * bucketAllocator do not free its memory.
989   * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey
990   *      cacheKey, Cacheable newBlock)
991   * @param key         Block cache key
992   * @param bucketEntry Bucket entry to put into backingMap.
993   */
994  protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
995    BucketEntry previousEntry = backingMap.put(key, bucketEntry);
996    if (previousEntry != null && previousEntry != bucketEntry) {
997      previousEntry.withWriteLock(offsetLock, () -> {
998        blockEvicted(key, previousEntry, false);
999        return null;
1000      });
1001    }
1002  }
1003
1004  /**
1005   * Prepare and return a warning message for Bucket Allocator Exception
1006   * @param fle The exception
1007   * @param re  The RAMQueueEntry for which the exception was thrown.
1008   * @return A warning message created from the input RAMQueueEntry object.
1009   */
1010  private static String getAllocationFailWarningMessage(final BucketAllocatorException fle,
1011    final RAMQueueEntry re) {
1012    final StringBuilder sb = new StringBuilder();
1013    sb.append("Most recent failed allocation after ");
1014    sb.append(ALLOCATION_FAIL_LOG_TIME_PERIOD);
1015    sb.append(" ms;");
1016    if (re != null) {
1017      if (re.getData() instanceof HFileBlock) {
1018        final HFileContext fileContext = ((HFileBlock) re.getData()).getHFileContext();
1019        final String columnFamily = Bytes.toString(fileContext.getColumnFamily());
1020        final String tableName = Bytes.toString(fileContext.getTableName());
1021        if (tableName != null && columnFamily != null) {
1022          sb.append(" Table: ");
1023          sb.append(tableName);
1024          sb.append(" CF: ");
1025          sb.append(columnFamily);
1026          sb.append(" HFile: ");
1027          sb.append(fileContext.getHFileName());
1028        }
1029      } else {
1030        sb.append(" HFile: ");
1031        sb.append(re.getKey());
1032      }
1033    }
1034    sb.append(" Message: ");
1035    sb.append(fle.getMessage());
1036    return sb.toString();
1037  }
1038
1039  /**
1040   * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. Process all that
1041   * are passed in even if failure being sure to remove from ramCache else we'll never undo the
1042   * references and we'll OOME.
1043   * @param entries Presumes list passed in here will be processed by this invocation only. No
1044   *                interference expected.
1045   */
1046  void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException {
1047    if (entries.isEmpty()) {
1048      return;
1049    }
1050    // This method is a little hard to follow. We run through the passed in entries and for each
1051    // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must
1052    // do cleanup making sure we've cleared ramCache of all entries regardless of whether we
1053    // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by
1054    // filling ramCache. We do the clean up by again running through the passed in entries
1055    // doing extra work when we find a non-null bucketEntries corresponding entry.
1056    final int size = entries.size();
1057    BucketEntry[] bucketEntries = new BucketEntry[size];
1058    // Index updated inside loop if success or if we can't succeed. We retry if cache is full
1059    // when we go to add an entry by going around the loop again without upping the index.
1060    int index = 0;
1061    while (cacheEnabled && index < size) {
1062      RAMQueueEntry re = null;
1063      try {
1064        re = entries.get(index);
1065        if (re == null) {
1066          LOG.warn("Couldn't get entry or changed on us; who else is messing with it?");
1067          index++;
1068          continue;
1069        }
1070        BlockCacheKey cacheKey = re.getKey();
1071        if (ramCache.containsKey(cacheKey)) {
1072          blocksByHFile.add(cacheKey);
1073        }
1074        // Reset the position for reuse.
1075        // It should be guaranteed that the data in the metaBuff has been transferred to the
1076        // ioEngine safely. Otherwise, this reuse is problematic. Fortunately, the data is already
1077        // transferred with our current IOEngines. Should take care, when we have new kinds of
1078        // IOEngine in the future.
1079        metaBuff.clear();
1080        BucketEntry bucketEntry =
1081          re.writeToCache(ioEngine, bucketAllocator, realCacheSize, this::createRecycler, metaBuff);
1082        // Successfully added. Up index and add bucketEntry. Clear io exceptions.
1083        bucketEntries[index] = bucketEntry;
1084        if (ioErrorStartTime > 0) {
1085          ioErrorStartTime = -1;
1086        }
1087        index++;
1088      } catch (BucketAllocatorException fle) {
1089        long currTs = EnvironmentEdgeManager.currentTime();
1090        cacheStats.allocationFailed(); // Record the warning.
1091        if (
1092          allocFailLogPrevTs == 0 || (currTs - allocFailLogPrevTs) > ALLOCATION_FAIL_LOG_TIME_PERIOD
1093        ) {
1094          LOG.warn(getAllocationFailWarningMessage(fle, re));
1095          allocFailLogPrevTs = currTs;
1096        }
1097        // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below.
1098        bucketEntries[index] = null;
1099        index++;
1100      } catch (CacheFullException cfe) {
1101        // Cache full when we tried to add. Try freeing space and then retrying (don't up index)
1102        if (!freeInProgress) {
1103          freeSpace("Full!");
1104        } else {
1105          Thread.sleep(50);
1106        }
1107      } catch (IOException ioex) {
1108        // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem.
1109        LOG.error("Failed writing to bucket cache", ioex);
1110        checkIOErrorIsTolerated();
1111      }
1112    }
1113
1114    // Make sure data pages are written on media before we update maps.
1115    try {
1116      ioEngine.sync();
1117    } catch (IOException ioex) {
1118      LOG.error("Failed syncing IO engine", ioex);
1119      checkIOErrorIsTolerated();
1120      // Since we failed sync, free the blocks in bucket allocator
1121      for (int i = 0; i < entries.size(); ++i) {
1122        if (bucketEntries[i] != null) {
1123          bucketAllocator.freeBlock(bucketEntries[i].offset());
1124          bucketEntries[i] = null;
1125        }
1126      }
1127    }
1128
1129    // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if
1130    // success or error.
1131    for (int i = 0; i < size; ++i) {
1132      BlockCacheKey key = entries.get(i).getKey();
1133      // Only add if non-null entry.
1134      if (bucketEntries[i] != null) {
1135        putIntoBackingMap(key, bucketEntries[i]);
1136      }
1137      // Always remove from ramCache even if we failed adding it to the block cache above.
1138      boolean existed = ramCache.remove(key, re -> {
1139        if (re != null) {
1140          heapSize.add(-1 * re.getData().heapSize());
1141        }
1142      });
1143      if (!existed && bucketEntries[i] != null) {
1144        // Block should have already been evicted. Remove it and free space.
1145        final BucketEntry bucketEntry = bucketEntries[i];
1146        bucketEntry.withWriteLock(offsetLock, () -> {
1147          if (backingMap.remove(key, bucketEntry)) {
1148            blockEvicted(key, bucketEntry, false);
1149          }
1150          return null;
1151        });
1152      }
1153    }
1154
1155    long used = bucketAllocator.getUsedSize();
1156    if (used > acceptableSize()) {
1157      freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
1158    }
1159    return;
1160  }
1161
1162  /**
1163   * Blocks until elements available in {@code q} then tries to grab as many as possible before
1164   * returning.
1165   * @param receptacle Where to stash the elements taken from queue. We clear before we use it just
1166   *                   in case.
1167   * @param q          The queue to take from.
1168   * @return {@code receptacle} laden with elements taken from the queue or empty if none found.
1169   */
1170  static List<RAMQueueEntry> getRAMQueueEntries(BlockingQueue<RAMQueueEntry> q,
1171    List<RAMQueueEntry> receptacle) throws InterruptedException {
1172    // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it
1173    // ok even if list grew to accommodate thousands.
1174    receptacle.clear();
1175    receptacle.add(q.take());
1176    q.drainTo(receptacle);
1177    return receptacle;
1178  }
1179
1180  /**
1181   * @see #retrieveFromFile(int[])
1182   */
1183  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION",
1184      justification = "false positive, try-with-resources ensures close is called.")
1185  private void persistToFile() throws IOException {
1186    assert !cacheEnabled;
1187    if (!ioEngine.isPersistent()) {
1188      throw new IOException("Attempt to persist non-persistent cache mappings!");
1189    }
1190    try (FileOutputStream fos = new FileOutputStream(persistencePath, false)) {
1191      fos.write(ProtobufMagic.PB_MAGIC);
1192      BucketProtoUtils.toPB(this).writeDelimitedTo(fos);
1193    }
1194  }
1195
1196  /**
1197   * @see #persistToFile()
1198   */
1199  private void retrieveFromFile(int[] bucketSizes) throws IOException {
1200    File persistenceFile = new File(persistencePath);
1201    if (!persistenceFile.exists()) {
1202      return;
1203    }
1204    assert !cacheEnabled;
1205
1206    try (FileInputStream in = deleteFileOnClose(persistenceFile)) {
1207      int pblen = ProtobufMagic.lengthOfPBMagic();
1208      byte[] pbuf = new byte[pblen];
1209      int read = in.read(pbuf);
1210      if (read != pblen) {
1211        throw new IOException("Incorrect number of bytes read while checking for protobuf magic "
1212          + "number. Requested=" + pblen + ", Received= " + read + ", File=" + persistencePath);
1213      }
1214      if (!ProtobufMagic.isPBMagicPrefix(pbuf)) {
1215        // In 3.0 we have enough flexibility to dump the old cache data.
1216        // TODO: In 2.x line, this might need to be filled in to support reading the old format
1217        throw new IOException(
1218          "Persistence file does not start with protobuf magic number. " + persistencePath);
1219      }
1220      parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in));
1221      bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize);
1222      blockNumber.add(backingMap.size());
1223    }
1224  }
1225
1226  /**
1227   * Create an input stream that deletes the file after reading it. Use in try-with-resources to
1228   * avoid this pattern where an exception thrown from a finally block may mask earlier exceptions:
1229   *
1230   * <pre>
1231   *   File f = ...
1232   *   try (FileInputStream fis = new FileInputStream(f)) {
1233   *     // use the input stream
1234   *   } finally {
1235   *     if (!f.delete()) throw new IOException("failed to delete");
1236   *   }
1237   * </pre>
1238   *
1239   * @param file the file to read and delete
1240   * @return a FileInputStream for the given file
1241   * @throws IOException if there is a problem creating the stream
1242   */
1243  private FileInputStream deleteFileOnClose(final File file) throws IOException {
1244    return new FileInputStream(file) {
1245      private File myFile;
1246
1247      private FileInputStream init(File file) {
1248        myFile = file;
1249        return this;
1250      }
1251
1252      @Override
1253      public void close() throws IOException {
1254        // close() will be called during try-with-resources and it will be
1255        // called by finalizer thread during GC. To avoid double-free resource,
1256        // set myFile to null after the first call.
1257        if (myFile == null) {
1258          return;
1259        }
1260
1261        super.close();
1262        if (!myFile.delete()) {
1263          throw new IOException("Failed deleting persistence file " + myFile.getAbsolutePath());
1264        }
1265        myFile = null;
1266      }
1267    }.init(file);
1268  }
1269
1270  private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass)
1271    throws IOException {
1272    if (capacitySize != cacheCapacity) {
1273      throw new IOException("Mismatched cache capacity:" + StringUtils.byteDesc(capacitySize)
1274        + ", expected: " + StringUtils.byteDesc(cacheCapacity));
1275    }
1276    if (!ioEngine.getClass().getName().equals(ioclass)) {
1277      throw new IOException("Class name for IO engine mismatch: " + ioclass + ", expected:"
1278        + ioEngine.getClass().getName());
1279    }
1280    if (!backingMap.getClass().getName().equals(mapclass)) {
1281      throw new IOException("Class name for cache map mismatch: " + mapclass + ", expected:"
1282        + backingMap.getClass().getName());
1283    }
1284  }
1285
1286  private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException {
1287    if (proto.hasChecksum()) {
1288      ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(),
1289        algorithm);
1290    } else {
1291      // if has not checksum, it means the persistence file is old format
1292      LOG.info("Persistent file is old format, it does not support verifying file integrity!");
1293    }
1294    verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass());
1295    backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(),
1296      this::createRecycler);
1297  }
1298
1299  /**
1300   * Check whether we tolerate IO error this time. If the duration of IOEngine throwing errors
1301   * exceeds ioErrorsDurationTimeTolerated, we will disable the cache
1302   */
1303  private void checkIOErrorIsTolerated() {
1304    long now = EnvironmentEdgeManager.currentTime();
1305    // Do a single read to a local variable to avoid timing issue - HBASE-24454
1306    long ioErrorStartTimeTmp = this.ioErrorStartTime;
1307    if (ioErrorStartTimeTmp > 0) {
1308      if (cacheEnabled && (now - ioErrorStartTimeTmp) > this.ioErrorsTolerationDuration) {
1309        LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration
1310          + "ms, disabling cache, please check your IOEngine");
1311        disableCache();
1312      }
1313    } else {
1314      this.ioErrorStartTime = now;
1315    }
1316  }
1317
1318  /**
1319   * Used to shut down the cache -or- turn it off in the case of something broken.
1320   */
1321  private void disableCache() {
1322    if (!cacheEnabled) return;
1323    cacheEnabled = false;
1324    ioEngine.shutdown();
1325    this.scheduleThreadPool.shutdown();
1326    for (int i = 0; i < writerThreads.length; ++i)
1327      writerThreads[i].interrupt();
1328    this.ramCache.clear();
1329    if (!ioEngine.isPersistent() || persistencePath == null) {
1330      // If persistent ioengine and a path, we will serialize out the backingMap.
1331      this.backingMap.clear();
1332    }
1333  }
1334
1335  private void join() throws InterruptedException {
1336    for (int i = 0; i < writerThreads.length; ++i)
1337      writerThreads[i].join();
1338  }
1339
1340  @Override
1341  public void shutdown() {
1342    disableCache();
1343    LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() + "; path to write="
1344      + persistencePath);
1345    if (ioEngine.isPersistent() && persistencePath != null) {
1346      try {
1347        join();
1348        persistToFile();
1349      } catch (IOException ex) {
1350        LOG.error("Unable to persist data on exit: " + ex.toString(), ex);
1351      } catch (InterruptedException e) {
1352        LOG.warn("Failed to persist data on exit", e);
1353      }
1354    }
1355  }
1356
1357  @Override
1358  public CacheStats getStats() {
1359    return cacheStats;
1360  }
1361
1362  public BucketAllocator getAllocator() {
1363    return this.bucketAllocator;
1364  }
1365
1366  @Override
1367  public long heapSize() {
1368    return this.heapSize.sum();
1369  }
1370
1371  @Override
1372  public long size() {
1373    return this.realCacheSize.sum();
1374  }
1375
1376  @Override
1377  public long getCurrentDataSize() {
1378    return size();
1379  }
1380
1381  @Override
1382  public long getFreeSize() {
1383    return this.bucketAllocator.getFreeSize();
1384  }
1385
1386  @Override
1387  public long getBlockCount() {
1388    return this.blockNumber.sum();
1389  }
1390
1391  @Override
1392  public long getDataBlockCount() {
1393    return getBlockCount();
1394  }
1395
1396  @Override
1397  public long getCurrentSize() {
1398    return this.bucketAllocator.getUsedSize();
1399  }
1400
1401  protected String getAlgorithm() {
1402    return algorithm;
1403  }
1404
1405  /**
1406   * Evicts all blocks for a specific HFile.
1407   * <p>
1408   * This is used for evict-on-close to remove all blocks of a specific HFile.
1409   * @return the number of blocks evicted
1410   */
1411  @Override
1412  public int evictBlocksByHfileName(String hfileName) {
1413    Set<BlockCacheKey> keySet = blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE),
1414      true, new BlockCacheKey(hfileName, Long.MAX_VALUE), true);
1415
1416    int numEvicted = 0;
1417    for (BlockCacheKey key : keySet) {
1418      if (evictBlock(key)) {
1419        ++numEvicted;
1420      }
1421    }
1422
1423    return numEvicted;
1424  }
1425
1426  /**
1427   * Used to group bucket entries into priority buckets. There will be a BucketEntryGroup for each
1428   * priority (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate
1429   * number of elements out of each according to configuration parameters and their relative sizes.
1430   */
1431  private class BucketEntryGroup {
1432
1433    private CachedEntryQueue queue;
1434    private long totalSize = 0;
1435    private long bucketSize;
1436
1437    public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) {
1438      this.bucketSize = bucketSize;
1439      queue = new CachedEntryQueue(bytesToFree, blockSize);
1440      totalSize = 0;
1441    }
1442
1443    public void add(Map.Entry<BlockCacheKey, BucketEntry> block) {
1444      totalSize += block.getValue().getLength();
1445      queue.add(block);
1446    }
1447
1448    public long free(long toFree) {
1449      Map.Entry<BlockCacheKey, BucketEntry> entry;
1450      long freedBytes = 0;
1451      // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free
1452      // What to do then? Caching attempt fail? Need some changes in cacheBlock API?
1453      while ((entry = queue.pollLast()) != null) {
1454        BlockCacheKey blockCacheKey = entry.getKey();
1455        BucketEntry be = entry.getValue();
1456        if (evictBucketEntryIfNoRpcReferenced(blockCacheKey, be)) {
1457          freedBytes += be.getLength();
1458        }
1459        if (freedBytes >= toFree) {
1460          return freedBytes;
1461        }
1462      }
1463      return freedBytes;
1464    }
1465
1466    public long overflow() {
1467      return totalSize - bucketSize;
1468    }
1469
1470    public long totalSize() {
1471      return totalSize;
1472    }
1473  }
1474
1475  /**
1476   * Block Entry stored in the memory with key,data and so on
1477   */
1478  static class RAMQueueEntry {
1479    private final BlockCacheKey key;
1480    private final Cacheable data;
1481    private long accessCounter;
1482    private boolean inMemory;
1483
1484    RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory) {
1485      this.key = bck;
1486      this.data = data;
1487      this.accessCounter = accessCounter;
1488      this.inMemory = inMemory;
1489    }
1490
1491    public Cacheable getData() {
1492      return data;
1493    }
1494
1495    public BlockCacheKey getKey() {
1496      return key;
1497    }
1498
1499    public void access(long accessCounter) {
1500      this.accessCounter = accessCounter;
1501    }
1502
1503    private ByteBuffAllocator getByteBuffAllocator() {
1504      if (data instanceof HFileBlock) {
1505        return ((HFileBlock) data).getByteBuffAllocator();
1506      }
1507      return ByteBuffAllocator.HEAP;
1508    }
1509
1510    public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc,
1511      final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler,
1512      ByteBuffer metaBuff) throws IOException {
1513      int len = data.getSerializedLength();
1514      // This cacheable thing can't be serialized
1515      if (len == 0) {
1516        return null;
1517      }
1518      long offset = alloc.allocateBlock(len);
1519      boolean succ = false;
1520      BucketEntry bucketEntry = null;
1521      try {
1522        bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory, createRecycler,
1523          getByteBuffAllocator());
1524        bucketEntry.setDeserializerReference(data.getDeserializer());
1525        if (data instanceof HFileBlock) {
1526          // If an instance of HFileBlock, save on some allocations.
1527          HFileBlock block = (HFileBlock) data;
1528          ByteBuff sliceBuf = block.getBufferReadOnly();
1529          block.getMetaData(metaBuff);
1530          ioEngine.write(sliceBuf, offset);
1531          ioEngine.write(metaBuff, offset + len - metaBuff.limit());
1532        } else {
1533          // Only used for testing.
1534          ByteBuffer bb = ByteBuffer.allocate(len);
1535          data.serialize(bb, true);
1536          ioEngine.write(bb, offset);
1537        }
1538        succ = true;
1539      } finally {
1540        if (!succ) {
1541          alloc.freeBlock(offset);
1542        }
1543      }
1544      realCacheSize.add(len);
1545      return bucketEntry;
1546    }
1547  }
1548
1549  /**
1550   * Only used in test n
1551   */
1552  void stopWriterThreads() throws InterruptedException {
1553    for (WriterThread writerThread : writerThreads) {
1554      writerThread.disableWriter();
1555      writerThread.interrupt();
1556      writerThread.join();
1557    }
1558  }
1559
1560  @Override
1561  public Iterator<CachedBlock> iterator() {
1562    // Don't bother with ramcache since stuff is in here only a little while.
1563    final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i = this.backingMap.entrySet().iterator();
1564    return new Iterator<CachedBlock>() {
1565      private final long now = System.nanoTime();
1566
1567      @Override
1568      public boolean hasNext() {
1569        return i.hasNext();
1570      }
1571
1572      @Override
1573      public CachedBlock next() {
1574        final Map.Entry<BlockCacheKey, BucketEntry> e = i.next();
1575        return new CachedBlock() {
1576          @Override
1577          public String toString() {
1578            return BlockCacheUtil.toString(this, now);
1579          }
1580
1581          @Override
1582          public BlockPriority getBlockPriority() {
1583            return e.getValue().getPriority();
1584          }
1585
1586          @Override
1587          public BlockType getBlockType() {
1588            // Not held by BucketEntry. Could add it if wanted on BucketEntry creation.
1589            return null;
1590          }
1591
1592          @Override
1593          public long getOffset() {
1594            return e.getKey().getOffset();
1595          }
1596
1597          @Override
1598          public long getSize() {
1599            return e.getValue().getLength();
1600          }
1601
1602          @Override
1603          public long getCachedTime() {
1604            return e.getValue().getCachedTime();
1605          }
1606
1607          @Override
1608          public String getFilename() {
1609            return e.getKey().getHfileName();
1610          }
1611
1612          @Override
1613          public int compareTo(CachedBlock other) {
1614            int diff = this.getFilename().compareTo(other.getFilename());
1615            if (diff != 0) return diff;
1616
1617            diff = Long.compare(this.getOffset(), other.getOffset());
1618            if (diff != 0) return diff;
1619            if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
1620              throw new IllegalStateException(
1621                "" + this.getCachedTime() + ", " + other.getCachedTime());
1622            }
1623            return Long.compare(other.getCachedTime(), this.getCachedTime());
1624          }
1625
1626          @Override
1627          public int hashCode() {
1628            return e.getKey().hashCode();
1629          }
1630
1631          @Override
1632          public boolean equals(Object obj) {
1633            if (obj instanceof CachedBlock) {
1634              CachedBlock cb = (CachedBlock) obj;
1635              return compareTo(cb) == 0;
1636            } else {
1637              return false;
1638            }
1639          }
1640        };
1641      }
1642
1643      @Override
1644      public void remove() {
1645        throw new UnsupportedOperationException();
1646      }
1647    };
1648  }
1649
1650  @Override
1651  public BlockCache[] getBlockCaches() {
1652    return null;
1653  }
1654
1655  public int getRpcRefCount(BlockCacheKey cacheKey) {
1656    BucketEntry bucketEntry = backingMap.get(cacheKey);
1657    if (bucketEntry != null) {
1658      return bucketEntry.refCnt() - (bucketEntry.markedAsEvicted.get() ? 0 : 1);
1659    }
1660    return 0;
1661  }
1662
1663  float getAcceptableFactor() {
1664    return acceptableFactor;
1665  }
1666
1667  float getMinFactor() {
1668    return minFactor;
1669  }
1670
1671  float getExtraFreeFactor() {
1672    return extraFreeFactor;
1673  }
1674
1675  float getSingleFactor() {
1676    return singleFactor;
1677  }
1678
1679  float getMultiFactor() {
1680    return multiFactor;
1681  }
1682
1683  float getMemoryFactor() {
1684    return memoryFactor;
1685  }
1686
1687  /**
1688   * Wrapped the delegate ConcurrentMap with maintaining its block's reference count.
1689   */
1690  static class RAMCache {
1691    /**
1692     * Defined the map as {@link ConcurrentHashMap} explicitly here, because in
1693     * {@link RAMCache#get(BlockCacheKey)} and
1694     * {@link RAMCache#putIfAbsent(BlockCacheKey, BucketCache.RAMQueueEntry)} , we need to guarantee
1695     * the atomicity of map#computeIfPresent(key, func) and map#putIfAbsent(key, func). Besides, the
1696     * func method can execute exactly once only when the key is present(or absent) and under the
1697     * lock context. Otherwise, the reference count of block will be messed up. Notice that the
1698     * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that.
1699     */
1700    final ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> delegate = new ConcurrentHashMap<>();
1701
1702    public boolean containsKey(BlockCacheKey key) {
1703      return delegate.containsKey(key);
1704    }
1705
1706    public RAMQueueEntry get(BlockCacheKey key) {
1707      return delegate.computeIfPresent(key, (k, re) -> {
1708        // It'll be referenced by RPC, so retain atomically here. if the get and retain is not
1709        // atomic, another thread may remove and release the block, when retaining in this thread we
1710        // may retain a block with refCnt=0 which is disallowed. (see HBASE-22422)
1711        re.getData().retain();
1712        return re;
1713      });
1714    }
1715
1716    /**
1717     * Return the previous associated value, or null if absent. It has the same meaning as
1718     * {@link ConcurrentMap#putIfAbsent(Object, Object)}
1719     */
1720    public RAMQueueEntry putIfAbsent(BlockCacheKey key, RAMQueueEntry entry) {
1721      AtomicBoolean absent = new AtomicBoolean(false);
1722      RAMQueueEntry re = delegate.computeIfAbsent(key, k -> {
1723        // The RAMCache reference to this entry, so reference count should be increment.
1724        entry.getData().retain();
1725        absent.set(true);
1726        return entry;
1727      });
1728      return absent.get() ? null : re;
1729    }
1730
1731    public boolean remove(BlockCacheKey key) {
1732      return remove(key, re -> {
1733      });
1734    }
1735
1736    /**
1737     * Defined an {@link Consumer} here, because once the removed entry release its reference count,
1738     * then it's ByteBuffers may be recycled and accessing it outside this method will be thrown an
1739     * exception. the consumer will access entry to remove before release its reference count.
1740     * Notice, don't change its reference count in the {@link Consumer}
1741     */
1742    public boolean remove(BlockCacheKey key, Consumer<RAMQueueEntry> action) {
1743      RAMQueueEntry previous = delegate.remove(key);
1744      action.accept(previous);
1745      if (previous != null) {
1746        previous.getData().release();
1747      }
1748      return previous != null;
1749    }
1750
1751    public boolean isEmpty() {
1752      return delegate.isEmpty();
1753    }
1754
1755    public void clear() {
1756      Iterator<Map.Entry<BlockCacheKey, RAMQueueEntry>> it = delegate.entrySet().iterator();
1757      while (it.hasNext()) {
1758        RAMQueueEntry re = it.next().getValue();
1759        it.remove();
1760        re.getData().release();
1761      }
1762    }
1763  }
1764}