001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import java.io.File;
021import java.io.FileInputStream;
022import java.io.FileOutputStream;
023import java.io.IOException;
024import java.nio.ByteBuffer;
025import java.util.ArrayList;
026import java.util.Comparator;
027import java.util.HashSet;
028import java.util.Iterator;
029import java.util.List;
030import java.util.Map;
031import java.util.NavigableSet;
032import java.util.PriorityQueue;
033import java.util.Set;
034import java.util.concurrent.ArrayBlockingQueue;
035import java.util.concurrent.BlockingQueue;
036import java.util.concurrent.ConcurrentHashMap;
037import java.util.concurrent.ConcurrentMap;
038import java.util.concurrent.ConcurrentSkipListSet;
039import java.util.concurrent.Executors;
040import java.util.concurrent.ScheduledExecutorService;
041import java.util.concurrent.TimeUnit;
042import java.util.concurrent.atomic.AtomicBoolean;
043import java.util.concurrent.atomic.AtomicLong;
044import java.util.concurrent.atomic.LongAdder;
045import java.util.concurrent.locks.Lock;
046import java.util.concurrent.locks.ReentrantLock;
047import java.util.concurrent.locks.ReentrantReadWriteLock;
048import java.util.function.Consumer;
049import java.util.function.Function;
050import org.apache.commons.io.IOUtils;
051import org.apache.hadoop.conf.Configuration;
052import org.apache.hadoop.hbase.HBaseConfiguration;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.client.Admin;
055import org.apache.hadoop.hbase.io.ByteBuffAllocator;
056import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
057import org.apache.hadoop.hbase.io.HeapSize;
058import org.apache.hadoop.hbase.io.hfile.BlockCache;
059import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
060import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
061import org.apache.hadoop.hbase.io.hfile.BlockPriority;
062import org.apache.hadoop.hbase.io.hfile.BlockType;
063import org.apache.hadoop.hbase.io.hfile.CacheStats;
064import org.apache.hadoop.hbase.io.hfile.Cacheable;
065import org.apache.hadoop.hbase.io.hfile.CachedBlock;
066import org.apache.hadoop.hbase.io.hfile.HFileBlock;
067import org.apache.hadoop.hbase.io.hfile.HFileContext;
068import org.apache.hadoop.hbase.nio.ByteBuff;
069import org.apache.hadoop.hbase.nio.RefCnt;
070import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
071import org.apache.hadoop.hbase.util.Bytes;
072import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
073import org.apache.hadoop.hbase.util.IdReadWriteLock;
074import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType;
075import org.apache.hadoop.util.StringUtils;
076import org.apache.yetus.audience.InterfaceAudience;
077import org.slf4j.Logger;
078import org.slf4j.LoggerFactory;
079
080import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
081import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
082
083import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos;
084
085/**
086 * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses BucketCache#ramCache
087 * and BucketCache#backingMap in order to determine if a given element is in the cache. The bucket
088 * cache can use off-heap memory {@link ByteBufferIOEngine} or mmap
089 * {@link ExclusiveMemoryMmapIOEngine} or pmem {@link SharedMemoryMmapIOEngine} or local files
090 * {@link FileIOEngine} to store/read the block data.
091 * <p>
092 * Eviction is via a similar algorithm as used in
093 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache}
094 * <p>
095 * BucketCache can be used as mainly a block cache (see
096 * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with a BlockCache to
097 * decrease CMS GC and heap fragmentation.
098 * <p>
099 * It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store blocks) to
100 * enlarge cache space via a victim cache.
101 */
102@InterfaceAudience.Private
103public class BucketCache implements BlockCache, HeapSize {
104  private static final Logger LOG = LoggerFactory.getLogger(BucketCache.class);
105
106  /** Priority buckets config */
107  static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor";
108  static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor";
109  static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor";
110  static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor";
111  static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor";
112  static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor";
113
114  /** Priority buckets */
115  static final float DEFAULT_SINGLE_FACTOR = 0.25f;
116  static final float DEFAULT_MULTI_FACTOR = 0.50f;
117  static final float DEFAULT_MEMORY_FACTOR = 0.25f;
118  static final float DEFAULT_MIN_FACTOR = 0.85f;
119
120  private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f;
121  private static final float DEFAULT_ACCEPT_FACTOR = 0.95f;
122
123  // Number of blocks to clear for each of the bucket size that is full
124  private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2;
125
126  /** Statistics thread */
127  private static final int statThreadPeriod = 5 * 60;
128
129  final static int DEFAULT_WRITER_THREADS = 3;
130  final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;
131
132  // Store/read block data
133  transient final IOEngine ioEngine;
134
135  // Store the block in this map before writing it to cache
136  transient final RAMCache ramCache;
137  // In this map, store the block's meta data like offset, length
138  transient ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMap;
139
140  /**
141   * Flag if the cache is enabled or not... We shut it off if there are IO errors for some time, so
142   * that Bucket IO exceptions/errors don't bring down the HBase server.
143   */
144  private volatile boolean cacheEnabled;
145
146  /**
147   * A list of writer queues. We have a queue per {@link WriterThread} we have running. In other
148   * words, the work adding blocks to the BucketCache is divided up amongst the running
149   * WriterThreads. Its done by taking hash of the cache key modulo queue count. WriterThread when
150   * it runs takes whatever has been recently added and 'drains' the entries to the BucketCache. It
151   * then updates the ramCache and backingMap accordingly.
152   */
153  transient final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList<>();
154  transient final WriterThread[] writerThreads;
155
156  /** Volatile boolean to track if free space is in process or not */
157  private volatile boolean freeInProgress = false;
158  private transient final Lock freeSpaceLock = new ReentrantLock();
159
160  private final LongAdder realCacheSize = new LongAdder();
161  private final LongAdder heapSize = new LongAdder();
162  /** Current number of cached elements */
163  private final LongAdder blockNumber = new LongAdder();
164
165  /** Cache access count (sequential ID) */
166  private final AtomicLong accessCount = new AtomicLong();
167
168  private static final int DEFAULT_CACHE_WAIT_TIME = 50;
169
170  /**
171   * Used in tests. If this flag is false and the cache speed is very fast, bucket cache will skip
172   * some blocks when caching. If the flag is true, we will wait until blocks are flushed to
173   * IOEngine.
174   */
175  boolean wait_when_cache = false;
176
177  private final BucketCacheStats cacheStats = new BucketCacheStats();
178
179  private final String persistencePath;
180  private final long cacheCapacity;
181  /** Approximate block size */
182  private final long blockSize;
183
184  /** Duration of IO errors tolerated before we disable cache, 1 min as default */
185  private final int ioErrorsTolerationDuration;
186  // 1 min
187  public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000;
188
189  // Start time of first IO error when reading or writing IO Engine, it will be
190  // reset after a successful read/write.
191  private volatile long ioErrorStartTime = -1;
192
193  /**
194   * A ReentrantReadWriteLock to lock on a particular block identified by offset. The purpose of
195   * this is to avoid freeing the block which is being read.
196   * <p>
197   * Key set of offsets in BucketCache is limited so soft reference is the best choice here.
198   */
199  transient final IdReadWriteLock<Long> offsetLock = new IdReadWriteLock<>(ReferenceType.SOFT);
200
201  private final NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<>((a, b) -> {
202    int nameComparison = a.getHfileName().compareTo(b.getHfileName());
203    if (nameComparison != 0) {
204      return nameComparison;
205    }
206    return Long.compare(a.getOffset(), b.getOffset());
207  });
208
209  /** Statistics thread schedule pool (for heavy debugging, could remove) */
210  private transient final ScheduledExecutorService scheduleThreadPool =
211    Executors.newScheduledThreadPool(1,
212      new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build());
213
214  // Allocate or free space for the block
215  private transient BucketAllocator bucketAllocator;
216
217  /** Acceptable size of cache (no evictions if size < acceptable) */
218  private float acceptableFactor;
219
220  /** Minimum threshold of cache (when evicting, evict until size < min) */
221  private float minFactor;
222
223  /**
224   * Free this floating point factor of extra blocks when evicting. For example free the number of
225   * blocks requested * (1 + extraFreeFactor)
226   */
227  private float extraFreeFactor;
228
229  /** Single access bucket size */
230  private float singleFactor;
231
232  /** Multiple access bucket size */
233  private float multiFactor;
234
235  /** In-memory bucket size */
236  private float memoryFactor;
237
238  private static final String FILE_VERIFY_ALGORITHM =
239    "hbase.bucketcache.persistent.file.integrity.check.algorithm";
240  private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
241
242  /**
243   * Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file
244   * integrity, default algorithm is MD5
245   */
246  private String algorithm;
247
248  /* Tracing failed Bucket Cache allocations. */
249  private long allocFailLogPrevTs; // time of previous log event for allocation failure.
250  private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000; // Default 1 minute.
251
252  public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
253    int writerThreadNum, int writerQLen, String persistencePath) throws IOException {
254    this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
255      persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create());
256  }
257
258  public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
259    int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration,
260    Configuration conf) throws IOException {
261    this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM);
262    this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath);
263    this.writerThreads = new WriterThread[writerThreadNum];
264    long blockNumCapacity = capacity / blockSize;
265    if (blockNumCapacity >= Integer.MAX_VALUE) {
266      // Enough for about 32TB of cache!
267      throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now");
268    }
269
270    this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR);
271    this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR);
272    this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR);
273    this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR);
274    this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR);
275    this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR);
276
277    sanityCheckConfigs();
278
279    LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor
280      + ", minFactor: " + minFactor + ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: "
281      + singleFactor + ", multiFactor: " + multiFactor + ", memoryFactor: " + memoryFactor);
282
283    this.cacheCapacity = capacity;
284    this.persistencePath = persistencePath;
285    this.blockSize = blockSize;
286    this.ioErrorsTolerationDuration = ioErrorsTolerationDuration;
287
288    this.allocFailLogPrevTs = 0;
289
290    bucketAllocator = new BucketAllocator(capacity, bucketSizes);
291    for (int i = 0; i < writerThreads.length; ++i) {
292      writerQueues.add(new ArrayBlockingQueue<>(writerQLen));
293    }
294
295    assert writerQueues.size() == writerThreads.length;
296    this.ramCache = new RAMCache();
297
298    this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity);
299
300    if (ioEngine.isPersistent() && persistencePath != null) {
301      try {
302        retrieveFromFile(bucketSizes);
303      } catch (IOException ioex) {
304        LOG.error("Can't restore from file[" + persistencePath + "] because of ", ioex);
305      }
306    }
307    final String threadName = Thread.currentThread().getName();
308    this.cacheEnabled = true;
309    for (int i = 0; i < writerThreads.length; ++i) {
310      writerThreads[i] = new WriterThread(writerQueues.get(i));
311      writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
312      writerThreads[i].setDaemon(true);
313    }
314    startWriterThreads();
315
316    // Run the statistics thread periodically to print the cache statistics log
317    // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log
318    // every five minutes.
319    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod,
320      statThreadPeriod, TimeUnit.SECONDS);
321    LOG.info("Started bucket cache; ioengine=" + ioEngineName + ", capacity="
322      + StringUtils.byteDesc(capacity) + ", blockSize=" + StringUtils.byteDesc(blockSize)
323      + ", writerThreadNum=" + writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath="
324      + persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName());
325  }
326
327  private void sanityCheckConfigs() {
328    Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0,
329      ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
330    Preconditions.checkArgument(minFactor <= 1 && minFactor >= 0,
331      MIN_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
332    Preconditions.checkArgument(minFactor <= acceptableFactor,
333      MIN_FACTOR_CONFIG_NAME + " must be <= " + ACCEPT_FACTOR_CONFIG_NAME);
334    Preconditions.checkArgument(extraFreeFactor >= 0,
335      EXTRA_FREE_FACTOR_CONFIG_NAME + " must be greater than 0.0");
336    Preconditions.checkArgument(singleFactor <= 1 && singleFactor >= 0,
337      SINGLE_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
338    Preconditions.checkArgument(multiFactor <= 1 && multiFactor >= 0,
339      MULTI_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
340    Preconditions.checkArgument(memoryFactor <= 1 && memoryFactor >= 0,
341      MEMORY_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
342    Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1,
343      SINGLE_FACTOR_CONFIG_NAME + ", " + MULTI_FACTOR_CONFIG_NAME + ", and "
344        + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0");
345  }
346
347  /**
348   * Called by the constructor to start the writer threads. Used by tests that need to override
349   * starting the threads.
350   */
351  protected void startWriterThreads() {
352    for (WriterThread thread : writerThreads) {
353      thread.start();
354    }
355  }
356
357  boolean isCacheEnabled() {
358    return this.cacheEnabled;
359  }
360
361  @Override
362  public long getMaxSize() {
363    return this.cacheCapacity;
364  }
365
366  public String getIoEngine() {
367    return ioEngine.toString();
368  }
369
370  /**
371   * Get the IOEngine from the IO engine name nnn * @return the IOEngine n
372   */
373  private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath)
374    throws IOException {
375    if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) {
376      // In order to make the usage simple, we only need the prefix 'files:' in
377      // document whether one or multiple file(s), but also support 'file:' for
378      // the compatibility
379      String[] filePaths =
380        ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER);
381      return new FileIOEngine(capacity, persistencePath != null, filePaths);
382    } else if (ioEngineName.startsWith("offheap")) {
383      return new ByteBufferIOEngine(capacity);
384    } else if (ioEngineName.startsWith("mmap:")) {
385      return new ExclusiveMemoryMmapIOEngine(ioEngineName.substring(5), capacity);
386    } else if (ioEngineName.startsWith("pmem:")) {
387      // This mode of bucket cache creates an IOEngine over a file on the persistent memory
388      // device. Since the persistent memory device has its own address space the contents
389      // mapped to this address space does not get swapped out like in the case of mmapping
390      // on to DRAM. Hence the cells created out of the hfile blocks in the pmem bucket cache
391      // can be directly referred to without having to copy them onheap. Once the RPC is done,
392      // the blocks can be returned back as in case of ByteBufferIOEngine.
393      return new SharedMemoryMmapIOEngine(ioEngineName.substring(5), capacity);
394    } else {
395      throw new IllegalArgumentException(
396        "Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap");
397    }
398  }
399
400  /**
401   * Cache the block with the specified name and buffer.
402   * @param cacheKey block's cache key
403   * @param buf      block buffer
404   */
405  @Override
406  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
407    cacheBlock(cacheKey, buf, false);
408  }
409
410  /**
411   * Cache the block with the specified name and buffer.
412   * @param cacheKey   block's cache key
413   * @param cachedItem block buffer
414   * @param inMemory   if block is in-memory
415   */
416  @Override
417  public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) {
418    cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache);
419  }
420
421  /**
422   * Cache the block to ramCache
423   * @param cacheKey   block's cache key
424   * @param cachedItem block buffer
425   * @param inMemory   if block is in-memory
426   * @param wait       if true, blocking wait when queue is full
427   */
428  public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
429    boolean wait) {
430    if (cacheEnabled) {
431      if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) {
432        if (shouldReplaceExistingCacheBlock(cacheKey, cachedItem)) {
433          BucketEntry bucketEntry = backingMap.get(cacheKey);
434          if (bucketEntry != null && bucketEntry.isRpcRef()) {
435            // avoid replace when there are RPC refs for the bucket entry in bucket cache
436            return;
437          }
438          cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
439        }
440      } else {
441        cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
442      }
443    }
444  }
445
446  protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) {
447    return BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, newBlock);
448  }
449
450  protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
451    boolean inMemory, boolean wait) {
452    if (!cacheEnabled) {
453      return;
454    }
455    LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
456    // Stuff the entry into the RAM cache so it can get drained to the persistent store
457    RAMQueueEntry re =
458      new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
459    /**
460     * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same
461     * key in ramCache, the heap size of bucket cache need to update if replacing entry from
462     * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if
463     * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct
464     * one, then the heap size will mess up (HBASE-20789)
465     */
466    if (ramCache.putIfAbsent(cacheKey, re) != null) {
467      return;
468    }
469    int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
470    BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum);
471    boolean successfulAddition = false;
472    if (wait) {
473      try {
474        successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS);
475      } catch (InterruptedException e) {
476        Thread.currentThread().interrupt();
477      }
478    } else {
479      successfulAddition = bq.offer(re);
480    }
481    if (!successfulAddition) {
482      ramCache.remove(cacheKey);
483      cacheStats.failInsert();
484    } else {
485      this.blockNumber.increment();
486      this.heapSize.add(cachedItem.heapSize());
487    }
488  }
489
490  /**
491   * Get the buffer of the block with the specified key.
492   * @param key                block's cache key
493   * @param caching            true if the caller caches blocks on cache misses
494   * @param repeat             Whether this is a repeat lookup for the same block
495   * @param updateCacheMetrics Whether we should update cache metrics or not
496   * @return buffer of specified cache key, or null if not in cache
497   */
498  @Override
499  public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
500    boolean updateCacheMetrics) {
501    if (!cacheEnabled) {
502      return null;
503    }
504    RAMQueueEntry re = ramCache.get(key);
505    if (re != null) {
506      if (updateCacheMetrics) {
507        cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
508      }
509      re.access(accessCount.incrementAndGet());
510      return re.getData();
511    }
512    BucketEntry bucketEntry = backingMap.get(key);
513    if (bucketEntry != null) {
514      long start = System.nanoTime();
515      ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
516      try {
517        lock.readLock().lock();
518        // We can not read here even if backingMap does contain the given key because its offset
519        // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check
520        // existence here.
521        if (bucketEntry.equals(backingMap.get(key))) {
522          // Read the block from IOEngine based on the bucketEntry's offset and length, NOTICE: the
523          // block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to
524          // the same BucketEntry, then all of the three will share the same refCnt.
525          Cacheable cachedBlock = ioEngine.read(bucketEntry);
526          if (ioEngine.usesSharedMemory()) {
527            // If IOEngine use shared memory, cachedBlock and BucketEntry will share the
528            // same RefCnt, do retain here, in order to count the number of RPC references
529            cachedBlock.retain();
530          }
531          // Update the cache statistics.
532          if (updateCacheMetrics) {
533            cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
534            cacheStats.ioHit(System.nanoTime() - start);
535          }
536          bucketEntry.access(accessCount.incrementAndGet());
537          if (this.ioErrorStartTime > 0) {
538            ioErrorStartTime = -1;
539          }
540          return cachedBlock;
541        }
542      } catch (IOException ioex) {
543        LOG.error("Failed reading block " + key + " from bucket cache", ioex);
544        checkIOErrorIsTolerated();
545      } finally {
546        lock.readLock().unlock();
547      }
548    }
549    if (!repeat && updateCacheMetrics) {
550      cacheStats.miss(caching, key.isPrimary(), key.getBlockType());
551    }
552    return null;
553  }
554
555  /**
556   * This method is invoked after the bucketEntry is removed from {@link BucketCache#backingMap}
557   */
558  void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber,
559    boolean evictedByEvictionProcess) {
560    bucketEntry.markAsEvicted();
561    blocksByHFile.remove(cacheKey);
562    if (decrementBlockNumber) {
563      this.blockNumber.decrement();
564    }
565    if (evictedByEvictionProcess) {
566      cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
567    }
568  }
569
570  /**
571   * Free the {{@link BucketEntry} actually,which could only be invoked when the
572   * {@link BucketEntry#refCnt} becoming 0.
573   */
574  void freeBucketEntry(BucketEntry bucketEntry) {
575    bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength());
576    realCacheSize.add(-1 * bucketEntry.getLength());
577  }
578
579  /**
580   * Try to evict the block from {@link BlockCache} by force. We'll call this in few cases:<br>
581   * 1. Close an HFile, and clear all cached blocks. <br>
582   * 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for a given table.<br>
583   * <p>
584   * Firstly, we'll try to remove the block from RAMCache,and then try to evict from backingMap.
585   * Here we evict the block from backingMap immediately, but only free the reference from bucket
586   * cache by calling {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this
587   * block, block can only be de-allocated when all of them release the block.
588   * <p>
589   * NOTICE: we need to grab the write offset lock firstly before releasing the reference from
590   * bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt = 0 when
591   * {@link BucketCache#getBlock(BlockCacheKey, boolean, boolean, boolean)}, it's a memory leak.
592   * @param cacheKey Block to evict
593   * @return true to indicate whether we've evicted successfully or not.
594   */
595  @Override
596  public boolean evictBlock(BlockCacheKey cacheKey) {
597    return doEvictBlock(cacheKey, null, false);
598  }
599
600  /**
601   * Evict the {@link BlockCacheKey} and {@link BucketEntry} from {@link BucketCache#backingMap} and
602   * {@link BucketCache#ramCache}. <br/>
603   * NOTE:When Evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and
604   * {@link BucketEntry} could be removed.
605   * @param cacheKey    {@link BlockCacheKey} to evict.
606   * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict.
607   * @return true to indicate whether we've evicted successfully or not.
608   */
609  private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry,
610    boolean evictedByEvictionProcess) {
611    if (!cacheEnabled) {
612      return false;
613    }
614    boolean existedInRamCache = removeFromRamCache(cacheKey);
615    if (bucketEntry == null) {
616      bucketEntry = backingMap.get(cacheKey);
617    }
618    final BucketEntry bucketEntryToUse = bucketEntry;
619
620    if (bucketEntryToUse == null) {
621      if (existedInRamCache && evictedByEvictionProcess) {
622        cacheStats.evicted(0, cacheKey.isPrimary());
623      }
624      return existedInRamCache;
625    } else {
626      return bucketEntryToUse.withWriteLock(offsetLock, () -> {
627        if (backingMap.remove(cacheKey, bucketEntryToUse)) {
628          blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache, evictedByEvictionProcess);
629          return true;
630        }
631        return false;
632      });
633    }
634  }
635
636  /**
637   * <pre>
638   * Create the {@link Recycler} for {@link BucketEntry#refCnt},which would be used as
639   * {@link RefCnt#recycler} of {@link HFileBlock#buf} returned from {@link BucketCache#getBlock}.
640   * NOTE: for {@link BucketCache#getBlock},the {@link RefCnt#recycler} of {@link HFileBlock#buf}
641   * from {@link BucketCache#backingMap} and {@link BucketCache#ramCache} are different:
642   * 1.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#backingMap},
643   *   it is the return value of current {@link BucketCache#createRecycler} method.
644   *
645   * 2.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#ramCache},
646   *   it is {@link ByteBuffAllocator#putbackBuffer}.
647   * </pre>
648   */
649  private Recycler createRecycler(final BucketEntry bucketEntry) {
650    return () -> {
651      freeBucketEntry(bucketEntry);
652      return;
653    };
654  }
655
656  /**
657   * NOTE: This method is only for test.
658   */
659  public boolean evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey) {
660    BucketEntry bucketEntry = backingMap.get(blockCacheKey);
661    if (bucketEntry == null) {
662      return false;
663    }
664    return evictBucketEntryIfNoRpcReferenced(blockCacheKey, bucketEntry);
665  }
666
667  /**
668   * Evict {@link BlockCacheKey} and its corresponding {@link BucketEntry} only if
669   * {@link BucketEntry#isRpcRef} is false. <br/>
670   * NOTE:When evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and
671   * {@link BucketEntry} could be removed.
672   * @param blockCacheKey {@link BlockCacheKey} to evict.
673   * @param bucketEntry   {@link BucketEntry} matched {@link BlockCacheKey} to evict.
674   * @return true to indicate whether we've evicted successfully or not.
675   */
676  boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, BucketEntry bucketEntry) {
677    if (!bucketEntry.isRpcRef()) {
678      return doEvictBlock(blockCacheKey, bucketEntry, true);
679    }
680    return false;
681  }
682
683  protected boolean removeFromRamCache(BlockCacheKey cacheKey) {
684    return ramCache.remove(cacheKey, re -> {
685      if (re != null) {
686        this.blockNumber.decrement();
687        this.heapSize.add(-1 * re.getData().heapSize());
688      }
689    });
690  }
691
692  /*
693   * Statistics thread. Periodically output cache statistics to the log.
694   */
695  private static class StatisticsThread extends Thread {
696    private final BucketCache bucketCache;
697
698    public StatisticsThread(BucketCache bucketCache) {
699      super("BucketCacheStatsThread");
700      setDaemon(true);
701      this.bucketCache = bucketCache;
702    }
703
704    @Override
705    public void run() {
706      bucketCache.logStats();
707    }
708  }
709
710  public void logStats() {
711    long totalSize = bucketAllocator.getTotalSize();
712    long usedSize = bucketAllocator.getUsedSize();
713    long freeSize = totalSize - usedSize;
714    long cacheSize = getRealCacheSize();
715    LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " + "totalSize="
716      + StringUtils.byteDesc(totalSize) + ", " + "freeSize=" + StringUtils.byteDesc(freeSize) + ", "
717      + "usedSize=" + StringUtils.byteDesc(usedSize) + ", " + "cacheSize="
718      + StringUtils.byteDesc(cacheSize) + ", " + "accesses=" + cacheStats.getRequestCount() + ", "
719      + "hits=" + cacheStats.getHitCount() + ", " + "IOhitsPerSecond="
720      + cacheStats.getIOHitsPerSecond() + ", " + "IOTimePerHit="
721      + String.format("%.2f", cacheStats.getIOTimePerHit()) + ", " + "hitRatio="
722      + (cacheStats.getHitCount() == 0
723        ? "0,"
724        : (StringUtils.formatPercent(cacheStats.getHitRatio(), 2) + ", "))
725      + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits="
726      + cacheStats.getHitCachingCount() + ", " + "cachingHitsRatio="
727      + (cacheStats.getHitCachingCount() == 0
728        ? "0,"
729        : (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2) + ", "))
730      + "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted="
731      + cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + cacheStats.evictedPerEviction()
732      + ", " + "allocationFailCount=" + cacheStats.getAllocationFailCount());
733    cacheStats.reset();
734
735    bucketAllocator.logDebugStatistics();
736  }
737
738  public long getRealCacheSize() {
739    return this.realCacheSize.sum();
740  }
741
742  public long acceptableSize() {
743    return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor);
744  }
745
746  long getPartitionSize(float partitionFactor) {
747    return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor);
748  }
749
750  /**
751   * Return the count of bucketSizeinfos still need free space
752   */
753  private int bucketSizesAboveThresholdCount(float minFactor) {
754    BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
755    int fullCount = 0;
756    for (int i = 0; i < stats.length; i++) {
757      long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor));
758      freeGoal = Math.max(freeGoal, 1);
759      if (stats[i].freeCount() < freeGoal) {
760        fullCount++;
761      }
762    }
763    return fullCount;
764  }
765
766  /**
767   * This method will find the buckets that are minimally occupied and are not reference counted and
768   * will free them completely without any constraint on the access times of the elements, and as a
769   * process will completely free at most the number of buckets passed, sometimes it might not due
770   * to changing refCounts
771   * @param completelyFreeBucketsNeeded number of buckets to free
772   **/
773  private void freeEntireBuckets(int completelyFreeBucketsNeeded) {
774    if (completelyFreeBucketsNeeded != 0) {
775      // First we will build a set where the offsets are reference counted, usually
776      // this set is small around O(Handler Count) unless something else is wrong
777      Set<Integer> inUseBuckets = new HashSet<>();
778      backingMap.forEach((k, be) -> {
779        if (be.isRpcRef()) {
780          inUseBuckets.add(bucketAllocator.getBucketIndex(be.offset()));
781        }
782      });
783      Set<Integer> candidateBuckets =
784        bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded);
785      for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) {
786        if (candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset()))) {
787          evictBucketEntryIfNoRpcReferenced(entry.getKey(), entry.getValue());
788        }
789      }
790    }
791  }
792
793  /**
794   * Free the space if the used size reaches acceptableSize() or one size block couldn't be
795   * allocated. When freeing the space, we use the LRU algorithm and ensure there must be some
796   * blocks evicted
797   * @param why Why we are being called
798   */
799  void freeSpace(final String why) {
800    // Ensure only one freeSpace progress at a time
801    if (!freeSpaceLock.tryLock()) {
802      return;
803    }
804    try {
805      freeInProgress = true;
806      long bytesToFreeWithoutExtra = 0;
807      // Calculate free byte for each bucketSizeinfo
808      StringBuilder msgBuffer = LOG.isDebugEnabled() ? new StringBuilder() : null;
809      BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
810      long[] bytesToFreeForBucket = new long[stats.length];
811      for (int i = 0; i < stats.length; i++) {
812        bytesToFreeForBucket[i] = 0;
813        long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor));
814        freeGoal = Math.max(freeGoal, 1);
815        if (stats[i].freeCount() < freeGoal) {
816          bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount());
817          bytesToFreeWithoutExtra += bytesToFreeForBucket[i];
818          if (msgBuffer != null) {
819            msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")="
820              + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", ");
821          }
822        }
823      }
824      if (msgBuffer != null) {
825        msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", ");
826      }
827
828      if (bytesToFreeWithoutExtra <= 0) {
829        return;
830      }
831      long currentSize = bucketAllocator.getUsedSize();
832      long totalSize = bucketAllocator.getTotalSize();
833      if (LOG.isDebugEnabled() && msgBuffer != null) {
834        LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString()
835          + " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize="
836          + StringUtils.byteDesc(realCacheSize.sum()) + ", total="
837          + StringUtils.byteDesc(totalSize));
838      }
839
840      long bytesToFreeWithExtra =
841        (long) Math.floor(bytesToFreeWithoutExtra * (1 + extraFreeFactor));
842
843      // Instantiate priority buckets
844      BucketEntryGroup bucketSingle =
845        new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(singleFactor));
846      BucketEntryGroup bucketMulti =
847        new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(multiFactor));
848      BucketEntryGroup bucketMemory =
849        new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(memoryFactor));
850
851      // Scan entire map putting bucket entry into appropriate bucket entry
852      // group
853      for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) {
854        switch (bucketEntryWithKey.getValue().getPriority()) {
855          case SINGLE: {
856            bucketSingle.add(bucketEntryWithKey);
857            break;
858          }
859          case MULTI: {
860            bucketMulti.add(bucketEntryWithKey);
861            break;
862          }
863          case MEMORY: {
864            bucketMemory.add(bucketEntryWithKey);
865            break;
866          }
867        }
868      }
869
870      PriorityQueue<BucketEntryGroup> bucketQueue =
871        new PriorityQueue<>(3, Comparator.comparingLong(BucketEntryGroup::overflow));
872
873      bucketQueue.add(bucketSingle);
874      bucketQueue.add(bucketMulti);
875      bucketQueue.add(bucketMemory);
876
877      int remainingBuckets = bucketQueue.size();
878      long bytesFreed = 0;
879
880      BucketEntryGroup bucketGroup;
881      while ((bucketGroup = bucketQueue.poll()) != null) {
882        long overflow = bucketGroup.overflow();
883        if (overflow > 0) {
884          long bucketBytesToFree =
885            Math.min(overflow, (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets);
886          bytesFreed += bucketGroup.free(bucketBytesToFree);
887        }
888        remainingBuckets--;
889      }
890
891      // Check and free if there are buckets that still need freeing of space
892      if (bucketSizesAboveThresholdCount(minFactor) > 0) {
893        bucketQueue.clear();
894        remainingBuckets = 3;
895
896        bucketQueue.add(bucketSingle);
897        bucketQueue.add(bucketMulti);
898        bucketQueue.add(bucketMemory);
899
900        while ((bucketGroup = bucketQueue.poll()) != null) {
901          long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets;
902          bytesFreed += bucketGroup.free(bucketBytesToFree);
903          remainingBuckets--;
904        }
905      }
906
907      // Even after the above free we might still need freeing because of the
908      // De-fragmentation of the buckets (also called Slab Calcification problem), i.e
909      // there might be some buckets where the occupancy is very sparse and thus are not
910      // yielding the free for the other bucket sizes, the fix for this to evict some
911      // of the buckets, we do this by evicting the buckets that are least fulled
912      freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR * bucketSizesAboveThresholdCount(1.0f));
913
914      if (LOG.isDebugEnabled()) {
915        long single = bucketSingle.totalSize();
916        long multi = bucketMulti.totalSize();
917        long memory = bucketMemory.totalSize();
918        if (LOG.isDebugEnabled()) {
919          LOG.debug("Bucket cache free space completed; " + "freed="
920            + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + StringUtils.byteDesc(totalSize)
921            + ", " + "single=" + StringUtils.byteDesc(single) + ", " + "multi="
922            + StringUtils.byteDesc(multi) + ", " + "memory=" + StringUtils.byteDesc(memory));
923        }
924      }
925
926    } catch (Throwable t) {
927      LOG.warn("Failed freeing space", t);
928    } finally {
929      cacheStats.evict();
930      freeInProgress = false;
931      freeSpaceLock.unlock();
932    }
933  }
934
935  // This handles flushing the RAM cache to IOEngine.
936  class WriterThread extends Thread {
937    private final BlockingQueue<RAMQueueEntry> inputQueue;
938    private volatile boolean writerEnabled = true;
939    private final ByteBuffer metaBuff = ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE);
940
941    WriterThread(BlockingQueue<RAMQueueEntry> queue) {
942      super("BucketCacheWriterThread");
943      this.inputQueue = queue;
944    }
945
946    // Used for test
947    void disableWriter() {
948      this.writerEnabled = false;
949    }
950
951    @Override
952    public void run() {
953      List<RAMQueueEntry> entries = new ArrayList<>();
954      try {
955        while (cacheEnabled && writerEnabled) {
956          try {
957            try {
958              // Blocks
959              entries = getRAMQueueEntries(inputQueue, entries);
960            } catch (InterruptedException ie) {
961              if (!cacheEnabled || !writerEnabled) {
962                break;
963              }
964            }
965            doDrain(entries, metaBuff);
966          } catch (Exception ioe) {
967            LOG.error("WriterThread encountered error", ioe);
968          }
969        }
970      } catch (Throwable t) {
971        LOG.warn("Failed doing drain", t);
972      }
973      LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
974    }
975  }
976
977  /**
978   * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing
979   * cache with a new block for the same cache key. there's a corner case: one thread cache a block
980   * in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another new block
981   * with the same cache key do the same thing for the same cache key, so if not evict the previous
982   * bucket entry, then memory leak happen because the previous bucketEntry is gone but the
983   * bucketAllocator do not free its memory.
984   * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey
985   *      cacheKey, Cacheable newBlock)
986   * @param key         Block cache key
987   * @param bucketEntry Bucket entry to put into backingMap.
988   */
989  protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
990    BucketEntry previousEntry = backingMap.put(key, bucketEntry);
991    if (previousEntry != null && previousEntry != bucketEntry) {
992      previousEntry.withWriteLock(offsetLock, () -> {
993        blockEvicted(key, previousEntry, false, false);
994        return null;
995      });
996    }
997  }
998
999  /**
1000   * Prepare and return a warning message for Bucket Allocator Exception
1001   * @param fle The exception
1002   * @param re  The RAMQueueEntry for which the exception was thrown.
1003   * @return A warning message created from the input RAMQueueEntry object.
1004   */
1005  private static String getAllocationFailWarningMessage(final BucketAllocatorException fle,
1006    final RAMQueueEntry re) {
1007    final StringBuilder sb = new StringBuilder();
1008    sb.append("Most recent failed allocation after ");
1009    sb.append(ALLOCATION_FAIL_LOG_TIME_PERIOD);
1010    sb.append(" ms;");
1011    if (re != null) {
1012      if (re.getData() instanceof HFileBlock) {
1013        final HFileContext fileContext = ((HFileBlock) re.getData()).getHFileContext();
1014        final String columnFamily = Bytes.toString(fileContext.getColumnFamily());
1015        final String tableName = Bytes.toString(fileContext.getTableName());
1016        if (tableName != null && columnFamily != null) {
1017          sb.append(" Table: ");
1018          sb.append(tableName);
1019          sb.append(" CF: ");
1020          sb.append(columnFamily);
1021          sb.append(" HFile: ");
1022          sb.append(fileContext.getHFileName());
1023        }
1024      } else {
1025        sb.append(" HFile: ");
1026        sb.append(re.getKey());
1027      }
1028    }
1029    sb.append(" Message: ");
1030    sb.append(fle.getMessage());
1031    return sb.toString();
1032  }
1033
1034  /**
1035   * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. Process all that
1036   * are passed in even if failure being sure to remove from ramCache else we'll never undo the
1037   * references and we'll OOME.
1038   * @param entries Presumes list passed in here will be processed by this invocation only. No
1039   *                interference expected.
1040   */
1041  void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException {
1042    if (entries.isEmpty()) {
1043      return;
1044    }
1045    // This method is a little hard to follow. We run through the passed in entries and for each
1046    // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must
1047    // do cleanup making sure we've cleared ramCache of all entries regardless of whether we
1048    // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by
1049    // filling ramCache. We do the clean up by again running through the passed in entries
1050    // doing extra work when we find a non-null bucketEntries corresponding entry.
1051    final int size = entries.size();
1052    BucketEntry[] bucketEntries = new BucketEntry[size];
1053    // Index updated inside loop if success or if we can't succeed. We retry if cache is full
1054    // when we go to add an entry by going around the loop again without upping the index.
1055    int index = 0;
1056    while (cacheEnabled && index < size) {
1057      RAMQueueEntry re = null;
1058      try {
1059        re = entries.get(index);
1060        if (re == null) {
1061          LOG.warn("Couldn't get entry or changed on us; who else is messing with it?");
1062          index++;
1063          continue;
1064        }
1065        BlockCacheKey cacheKey = re.getKey();
1066        if (ramCache.containsKey(cacheKey)) {
1067          blocksByHFile.add(cacheKey);
1068        }
1069        // Reset the position for reuse.
1070        // It should be guaranteed that the data in the metaBuff has been transferred to the
1071        // ioEngine safely. Otherwise, this reuse is problematic. Fortunately, the data is already
1072        // transferred with our current IOEngines. Should take care, when we have new kinds of
1073        // IOEngine in the future.
1074        metaBuff.clear();
1075        BucketEntry bucketEntry =
1076          re.writeToCache(ioEngine, bucketAllocator, realCacheSize, this::createRecycler, metaBuff);
1077        // Successfully added. Up index and add bucketEntry. Clear io exceptions.
1078        bucketEntries[index] = bucketEntry;
1079        if (ioErrorStartTime > 0) {
1080          ioErrorStartTime = -1;
1081        }
1082        index++;
1083      } catch (BucketAllocatorException fle) {
1084        long currTs = EnvironmentEdgeManager.currentTime();
1085        cacheStats.allocationFailed(); // Record the warning.
1086        if (
1087          allocFailLogPrevTs == 0 || (currTs - allocFailLogPrevTs) > ALLOCATION_FAIL_LOG_TIME_PERIOD
1088        ) {
1089          LOG.warn(getAllocationFailWarningMessage(fle, re));
1090          allocFailLogPrevTs = currTs;
1091        }
1092        // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below.
1093        bucketEntries[index] = null;
1094        index++;
1095      } catch (CacheFullException cfe) {
1096        // Cache full when we tried to add. Try freeing space and then retrying (don't up index)
1097        if (!freeInProgress) {
1098          freeSpace("Full!");
1099        } else {
1100          Thread.sleep(50);
1101        }
1102      } catch (IOException ioex) {
1103        // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem.
1104        LOG.error("Failed writing to bucket cache", ioex);
1105        checkIOErrorIsTolerated();
1106      }
1107    }
1108
1109    // Make sure data pages are written on media before we update maps.
1110    try {
1111      ioEngine.sync();
1112    } catch (IOException ioex) {
1113      LOG.error("Failed syncing IO engine", ioex);
1114      checkIOErrorIsTolerated();
1115      // Since we failed sync, free the blocks in bucket allocator
1116      for (int i = 0; i < entries.size(); ++i) {
1117        BucketEntry bucketEntry = bucketEntries[i];
1118        if (bucketEntry != null) {
1119          bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength());
1120          bucketEntries[i] = null;
1121        }
1122      }
1123    }
1124
1125    // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if
1126    // success or error.
1127    for (int i = 0; i < size; ++i) {
1128      BlockCacheKey key = entries.get(i).getKey();
1129      // Only add if non-null entry.
1130      if (bucketEntries[i] != null) {
1131        putIntoBackingMap(key, bucketEntries[i]);
1132      }
1133      // Always remove from ramCache even if we failed adding it to the block cache above.
1134      boolean existed = ramCache.remove(key, re -> {
1135        if (re != null) {
1136          heapSize.add(-1 * re.getData().heapSize());
1137        }
1138      });
1139      if (!existed && bucketEntries[i] != null) {
1140        // Block should have already been evicted. Remove it and free space.
1141        final BucketEntry bucketEntry = bucketEntries[i];
1142        bucketEntry.withWriteLock(offsetLock, () -> {
1143          if (backingMap.remove(key, bucketEntry)) {
1144            blockEvicted(key, bucketEntry, false, false);
1145          }
1146          return null;
1147        });
1148      }
1149    }
1150
1151    long used = bucketAllocator.getUsedSize();
1152    if (used > acceptableSize()) {
1153      freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
1154    }
1155    return;
1156  }
1157
1158  /**
1159   * Blocks until elements available in {@code q} then tries to grab as many as possible before
1160   * returning.
1161   * @param receptacle Where to stash the elements taken from queue. We clear before we use it just
1162   *                   in case.
1163   * @param q          The queue to take from.
1164   * @return {@code receptacle} laden with elements taken from the queue or empty if none found.
1165   */
1166  static List<RAMQueueEntry> getRAMQueueEntries(BlockingQueue<RAMQueueEntry> q,
1167    List<RAMQueueEntry> receptacle) throws InterruptedException {
1168    // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it
1169    // ok even if list grew to accommodate thousands.
1170    receptacle.clear();
1171    receptacle.add(q.take());
1172    q.drainTo(receptacle);
1173    return receptacle;
1174  }
1175
1176  /**
1177   * @see #retrieveFromFile(int[])
1178   */
1179  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION",
1180      justification = "false positive, try-with-resources ensures close is called.")
1181  private void persistToFile() throws IOException {
1182    assert !cacheEnabled;
1183    if (!ioEngine.isPersistent()) {
1184      throw new IOException("Attempt to persist non-persistent cache mappings!");
1185    }
1186    try (FileOutputStream fos = new FileOutputStream(persistencePath, false)) {
1187      fos.write(ProtobufMagic.PB_MAGIC);
1188      BucketProtoUtils.toPB(this).writeDelimitedTo(fos);
1189    }
1190  }
1191
1192  /**
1193   * @see #persistToFile()
1194   */
1195  private void retrieveFromFile(int[] bucketSizes) throws IOException {
1196    File persistenceFile = new File(persistencePath);
1197    if (!persistenceFile.exists()) {
1198      return;
1199    }
1200    assert !cacheEnabled;
1201
1202    try (FileInputStream in = deleteFileOnClose(persistenceFile)) {
1203      int pblen = ProtobufMagic.lengthOfPBMagic();
1204      byte[] pbuf = new byte[pblen];
1205      IOUtils.readFully(in, pbuf, 0, pblen);
1206      if (!ProtobufMagic.isPBMagicPrefix(pbuf)) {
1207        // In 3.0 we have enough flexibility to dump the old cache data.
1208        // TODO: In 2.x line, this might need to be filled in to support reading the old format
1209        throw new IOException(
1210          "Persistence file does not start with protobuf magic number. " + persistencePath);
1211      }
1212      parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in));
1213      bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize);
1214      blockNumber.add(backingMap.size());
1215    }
1216  }
1217
1218  /**
1219   * Create an input stream that deletes the file after reading it. Use in try-with-resources to
1220   * avoid this pattern where an exception thrown from a finally block may mask earlier exceptions:
1221   *
1222   * <pre>
1223   *   File f = ...
1224   *   try (FileInputStream fis = new FileInputStream(f)) {
1225   *     // use the input stream
1226   *   } finally {
1227   *     if (!f.delete()) throw new IOException("failed to delete");
1228   *   }
1229   * </pre>
1230   *
1231   * @param file the file to read and delete
1232   * @return a FileInputStream for the given file
1233   * @throws IOException if there is a problem creating the stream
1234   */
1235  private FileInputStream deleteFileOnClose(final File file) throws IOException {
1236    return new FileInputStream(file) {
1237      private File myFile;
1238
1239      private FileInputStream init(File file) {
1240        myFile = file;
1241        return this;
1242      }
1243
1244      @Override
1245      public void close() throws IOException {
1246        // close() will be called during try-with-resources and it will be
1247        // called by finalizer thread during GC. To avoid double-free resource,
1248        // set myFile to null after the first call.
1249        if (myFile == null) {
1250          return;
1251        }
1252
1253        super.close();
1254        if (!myFile.delete()) {
1255          throw new IOException("Failed deleting persistence file " + myFile.getAbsolutePath());
1256        }
1257        myFile = null;
1258      }
1259    }.init(file);
1260  }
1261
1262  private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass)
1263    throws IOException {
1264    if (capacitySize != cacheCapacity) {
1265      throw new IOException("Mismatched cache capacity:" + StringUtils.byteDesc(capacitySize)
1266        + ", expected: " + StringUtils.byteDesc(cacheCapacity));
1267    }
1268    if (!ioEngine.getClass().getName().equals(ioclass)) {
1269      throw new IOException("Class name for IO engine mismatch: " + ioclass + ", expected:"
1270        + ioEngine.getClass().getName());
1271    }
1272    if (!backingMap.getClass().getName().equals(mapclass)) {
1273      throw new IOException("Class name for cache map mismatch: " + mapclass + ", expected:"
1274        + backingMap.getClass().getName());
1275    }
1276  }
1277
1278  private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException {
1279    if (proto.hasChecksum()) {
1280      ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(),
1281        algorithm);
1282    } else {
1283      // if has not checksum, it means the persistence file is old format
1284      LOG.info("Persistent file is old format, it does not support verifying file integrity!");
1285    }
1286    verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass());
1287    backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(),
1288      this::createRecycler);
1289  }
1290
1291  /**
1292   * Check whether we tolerate IO error this time. If the duration of IOEngine throwing errors
1293   * exceeds ioErrorsDurationTimeTolerated, we will disable the cache
1294   */
1295  private void checkIOErrorIsTolerated() {
1296    long now = EnvironmentEdgeManager.currentTime();
1297    // Do a single read to a local variable to avoid timing issue - HBASE-24454
1298    long ioErrorStartTimeTmp = this.ioErrorStartTime;
1299    if (ioErrorStartTimeTmp > 0) {
1300      if (cacheEnabled && (now - ioErrorStartTimeTmp) > this.ioErrorsTolerationDuration) {
1301        LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration
1302          + "ms, disabling cache, please check your IOEngine");
1303        disableCache();
1304      }
1305    } else {
1306      this.ioErrorStartTime = now;
1307    }
1308  }
1309
1310  /**
1311   * Used to shut down the cache -or- turn it off in the case of something broken.
1312   */
1313  private void disableCache() {
1314    if (!cacheEnabled) return;
1315    cacheEnabled = false;
1316    ioEngine.shutdown();
1317    this.scheduleThreadPool.shutdown();
1318    for (int i = 0; i < writerThreads.length; ++i)
1319      writerThreads[i].interrupt();
1320    this.ramCache.clear();
1321    if (!ioEngine.isPersistent() || persistencePath == null) {
1322      // If persistent ioengine and a path, we will serialize out the backingMap.
1323      this.backingMap.clear();
1324    }
1325  }
1326
1327  private void join() throws InterruptedException {
1328    for (int i = 0; i < writerThreads.length; ++i)
1329      writerThreads[i].join();
1330  }
1331
1332  @Override
1333  public void shutdown() {
1334    disableCache();
1335    LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() + "; path to write="
1336      + persistencePath);
1337    if (ioEngine.isPersistent() && persistencePath != null) {
1338      try {
1339        join();
1340        persistToFile();
1341      } catch (IOException ex) {
1342        LOG.error("Unable to persist data on exit: " + ex.toString(), ex);
1343      } catch (InterruptedException e) {
1344        LOG.warn("Failed to persist data on exit", e);
1345      }
1346    }
1347  }
1348
1349  @Override
1350  public CacheStats getStats() {
1351    return cacheStats;
1352  }
1353
1354  public BucketAllocator getAllocator() {
1355    return this.bucketAllocator;
1356  }
1357
1358  @Override
1359  public long heapSize() {
1360    return this.heapSize.sum();
1361  }
1362
1363  @Override
1364  public long size() {
1365    return this.realCacheSize.sum();
1366  }
1367
1368  @Override
1369  public long getCurrentDataSize() {
1370    return size();
1371  }
1372
1373  @Override
1374  public long getFreeSize() {
1375    return this.bucketAllocator.getFreeSize();
1376  }
1377
1378  @Override
1379  public long getBlockCount() {
1380    return this.blockNumber.sum();
1381  }
1382
1383  @Override
1384  public long getDataBlockCount() {
1385    return getBlockCount();
1386  }
1387
1388  @Override
1389  public long getCurrentSize() {
1390    return this.bucketAllocator.getUsedSize();
1391  }
1392
1393  protected String getAlgorithm() {
1394    return algorithm;
1395  }
1396
1397  /**
1398   * Evicts all blocks for a specific HFile.
1399   * <p>
1400   * This is used for evict-on-close to remove all blocks of a specific HFile.
1401   * @return the number of blocks evicted
1402   */
1403  @Override
1404  public int evictBlocksByHfileName(String hfileName) {
1405    Set<BlockCacheKey> keySet = blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE),
1406      true, new BlockCacheKey(hfileName, Long.MAX_VALUE), true);
1407
1408    int numEvicted = 0;
1409    for (BlockCacheKey key : keySet) {
1410      if (evictBlock(key)) {
1411        ++numEvicted;
1412      }
1413    }
1414
1415    return numEvicted;
1416  }
1417
1418  /**
1419   * Used to group bucket entries into priority buckets. There will be a BucketEntryGroup for each
1420   * priority (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate
1421   * number of elements out of each according to configuration parameters and their relative sizes.
1422   */
1423  private class BucketEntryGroup {
1424
1425    private CachedEntryQueue queue;
1426    private long totalSize = 0;
1427    private long bucketSize;
1428
1429    public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) {
1430      this.bucketSize = bucketSize;
1431      queue = new CachedEntryQueue(bytesToFree, blockSize);
1432      totalSize = 0;
1433    }
1434
1435    public void add(Map.Entry<BlockCacheKey, BucketEntry> block) {
1436      totalSize += block.getValue().getLength();
1437      queue.add(block);
1438    }
1439
1440    public long free(long toFree) {
1441      Map.Entry<BlockCacheKey, BucketEntry> entry;
1442      long freedBytes = 0;
1443      // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free
1444      // What to do then? Caching attempt fail? Need some changes in cacheBlock API?
1445      while ((entry = queue.pollLast()) != null) {
1446        BlockCacheKey blockCacheKey = entry.getKey();
1447        BucketEntry be = entry.getValue();
1448        if (evictBucketEntryIfNoRpcReferenced(blockCacheKey, be)) {
1449          freedBytes += be.getLength();
1450        }
1451        if (freedBytes >= toFree) {
1452          return freedBytes;
1453        }
1454      }
1455      return freedBytes;
1456    }
1457
1458    public long overflow() {
1459      return totalSize - bucketSize;
1460    }
1461
1462    public long totalSize() {
1463      return totalSize;
1464    }
1465  }
1466
1467  /**
1468   * Block Entry stored in the memory with key,data and so on
1469   */
1470  static class RAMQueueEntry {
1471    private final BlockCacheKey key;
1472    private final Cacheable data;
1473    private long accessCounter;
1474    private boolean inMemory;
1475
1476    RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory) {
1477      this.key = bck;
1478      this.data = data;
1479      this.accessCounter = accessCounter;
1480      this.inMemory = inMemory;
1481    }
1482
1483    public Cacheable getData() {
1484      return data;
1485    }
1486
1487    public BlockCacheKey getKey() {
1488      return key;
1489    }
1490
1491    public void access(long accessCounter) {
1492      this.accessCounter = accessCounter;
1493    }
1494
1495    private ByteBuffAllocator getByteBuffAllocator() {
1496      if (data instanceof HFileBlock) {
1497        return ((HFileBlock) data).getByteBuffAllocator();
1498      }
1499      return ByteBuffAllocator.HEAP;
1500    }
1501
1502    public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc,
1503      final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler,
1504      ByteBuffer metaBuff) throws IOException {
1505      int len = data.getSerializedLength();
1506      // This cacheable thing can't be serialized
1507      if (len == 0) {
1508        return null;
1509      }
1510      long offset = alloc.allocateBlock(len);
1511      boolean succ = false;
1512      BucketEntry bucketEntry = null;
1513      try {
1514        bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory, createRecycler,
1515          getByteBuffAllocator());
1516        bucketEntry.setDeserializerReference(data.getDeserializer());
1517        if (data instanceof HFileBlock) {
1518          // If an instance of HFileBlock, save on some allocations.
1519          HFileBlock block = (HFileBlock) data;
1520          ByteBuff sliceBuf = block.getBufferReadOnly();
1521          block.getMetaData(metaBuff);
1522          ioEngine.write(sliceBuf, offset);
1523          ioEngine.write(metaBuff, offset + len - metaBuff.limit());
1524        } else {
1525          // Only used for testing.
1526          ByteBuffer bb = ByteBuffer.allocate(len);
1527          data.serialize(bb, true);
1528          ioEngine.write(bb, offset);
1529        }
1530        succ = true;
1531      } finally {
1532        if (!succ) {
1533          alloc.freeBlock(offset, len);
1534        }
1535      }
1536      realCacheSize.add(len);
1537      return bucketEntry;
1538    }
1539  }
1540
1541  /**
1542   * Only used in test n
1543   */
1544  void stopWriterThreads() throws InterruptedException {
1545    for (WriterThread writerThread : writerThreads) {
1546      writerThread.disableWriter();
1547      writerThread.interrupt();
1548      writerThread.join();
1549    }
1550  }
1551
1552  @Override
1553  public Iterator<CachedBlock> iterator() {
1554    // Don't bother with ramcache since stuff is in here only a little while.
1555    final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i = this.backingMap.entrySet().iterator();
1556    return new Iterator<CachedBlock>() {
1557      private final long now = System.nanoTime();
1558
1559      @Override
1560      public boolean hasNext() {
1561        return i.hasNext();
1562      }
1563
1564      @Override
1565      public CachedBlock next() {
1566        final Map.Entry<BlockCacheKey, BucketEntry> e = i.next();
1567        return new CachedBlock() {
1568          @Override
1569          public String toString() {
1570            return BlockCacheUtil.toString(this, now);
1571          }
1572
1573          @Override
1574          public BlockPriority getBlockPriority() {
1575            return e.getValue().getPriority();
1576          }
1577
1578          @Override
1579          public BlockType getBlockType() {
1580            // Not held by BucketEntry. Could add it if wanted on BucketEntry creation.
1581            return null;
1582          }
1583
1584          @Override
1585          public long getOffset() {
1586            return e.getKey().getOffset();
1587          }
1588
1589          @Override
1590          public long getSize() {
1591            return e.getValue().getLength();
1592          }
1593
1594          @Override
1595          public long getCachedTime() {
1596            return e.getValue().getCachedTime();
1597          }
1598
1599          @Override
1600          public String getFilename() {
1601            return e.getKey().getHfileName();
1602          }
1603
1604          @Override
1605          public int compareTo(CachedBlock other) {
1606            int diff = this.getFilename().compareTo(other.getFilename());
1607            if (diff != 0) return diff;
1608
1609            diff = Long.compare(this.getOffset(), other.getOffset());
1610            if (diff != 0) return diff;
1611            if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
1612              throw new IllegalStateException(
1613                "" + this.getCachedTime() + ", " + other.getCachedTime());
1614            }
1615            return Long.compare(other.getCachedTime(), this.getCachedTime());
1616          }
1617
1618          @Override
1619          public int hashCode() {
1620            return e.getKey().hashCode();
1621          }
1622
1623          @Override
1624          public boolean equals(Object obj) {
1625            if (obj instanceof CachedBlock) {
1626              CachedBlock cb = (CachedBlock) obj;
1627              return compareTo(cb) == 0;
1628            } else {
1629              return false;
1630            }
1631          }
1632        };
1633      }
1634
1635      @Override
1636      public void remove() {
1637        throw new UnsupportedOperationException();
1638      }
1639    };
1640  }
1641
1642  @Override
1643  public BlockCache[] getBlockCaches() {
1644    return null;
1645  }
1646
1647  public int getRpcRefCount(BlockCacheKey cacheKey) {
1648    BucketEntry bucketEntry = backingMap.get(cacheKey);
1649    if (bucketEntry != null) {
1650      return bucketEntry.refCnt() - (bucketEntry.markedAsEvicted.get() ? 0 : 1);
1651    }
1652    return 0;
1653  }
1654
1655  float getAcceptableFactor() {
1656    return acceptableFactor;
1657  }
1658
1659  float getMinFactor() {
1660    return minFactor;
1661  }
1662
1663  float getExtraFreeFactor() {
1664    return extraFreeFactor;
1665  }
1666
1667  float getSingleFactor() {
1668    return singleFactor;
1669  }
1670
1671  float getMultiFactor() {
1672    return multiFactor;
1673  }
1674
1675  float getMemoryFactor() {
1676    return memoryFactor;
1677  }
1678
1679  /**
1680   * Wrapped the delegate ConcurrentMap with maintaining its block's reference count.
1681   */
1682  static class RAMCache {
1683    /**
1684     * Defined the map as {@link ConcurrentHashMap} explicitly here, because in
1685     * {@link RAMCache#get(BlockCacheKey)} and
1686     * {@link RAMCache#putIfAbsent(BlockCacheKey, BucketCache.RAMQueueEntry)} , we need to guarantee
1687     * the atomicity of map#computeIfPresent(key, func) and map#putIfAbsent(key, func). Besides, the
1688     * func method can execute exactly once only when the key is present(or absent) and under the
1689     * lock context. Otherwise, the reference count of block will be messed up. Notice that the
1690     * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that.
1691     */
1692    final ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> delegate = new ConcurrentHashMap<>();
1693
1694    public boolean containsKey(BlockCacheKey key) {
1695      return delegate.containsKey(key);
1696    }
1697
1698    public RAMQueueEntry get(BlockCacheKey key) {
1699      return delegate.computeIfPresent(key, (k, re) -> {
1700        // It'll be referenced by RPC, so retain atomically here. if the get and retain is not
1701        // atomic, another thread may remove and release the block, when retaining in this thread we
1702        // may retain a block with refCnt=0 which is disallowed. (see HBASE-22422)
1703        re.getData().retain();
1704        return re;
1705      });
1706    }
1707
1708    /**
1709     * Return the previous associated value, or null if absent. It has the same meaning as
1710     * {@link ConcurrentMap#putIfAbsent(Object, Object)}
1711     */
1712    public RAMQueueEntry putIfAbsent(BlockCacheKey key, RAMQueueEntry entry) {
1713      AtomicBoolean absent = new AtomicBoolean(false);
1714      RAMQueueEntry re = delegate.computeIfAbsent(key, k -> {
1715        // The RAMCache reference to this entry, so reference count should be increment.
1716        entry.getData().retain();
1717        absent.set(true);
1718        return entry;
1719      });
1720      return absent.get() ? null : re;
1721    }
1722
1723    public boolean remove(BlockCacheKey key) {
1724      return remove(key, re -> {
1725      });
1726    }
1727
1728    /**
1729     * Defined an {@link Consumer} here, because once the removed entry release its reference count,
1730     * then it's ByteBuffers may be recycled and accessing it outside this method will be thrown an
1731     * exception. the consumer will access entry to remove before release its reference count.
1732     * Notice, don't change its reference count in the {@link Consumer}
1733     */
1734    public boolean remove(BlockCacheKey key, Consumer<RAMQueueEntry> action) {
1735      RAMQueueEntry previous = delegate.remove(key);
1736      action.accept(previous);
1737      if (previous != null) {
1738        previous.getData().release();
1739      }
1740      return previous != null;
1741    }
1742
1743    public boolean isEmpty() {
1744      return delegate.isEmpty();
1745    }
1746
1747    public void clear() {
1748      Iterator<Map.Entry<BlockCacheKey, RAMQueueEntry>> it = delegate.entrySet().iterator();
1749      while (it.hasNext()) {
1750        RAMQueueEntry re = it.next().getValue();
1751        it.remove();
1752        re.getData().release();
1753      }
1754    }
1755  }
1756}