001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY;
021
022import java.io.File;
023import java.io.FileInputStream;
024import java.io.FileOutputStream;
025import java.io.IOException;
026import java.nio.ByteBuffer;
027import java.util.ArrayList;
028import java.util.Comparator;
029import java.util.HashSet;
030import java.util.Iterator;
031import java.util.List;
032import java.util.Map;
033import java.util.NavigableSet;
034import java.util.PriorityQueue;
035import java.util.Set;
036import java.util.concurrent.ArrayBlockingQueue;
037import java.util.concurrent.BlockingQueue;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentMap;
040import java.util.concurrent.ConcurrentSkipListSet;
041import java.util.concurrent.Executors;
042import java.util.concurrent.ScheduledExecutorService;
043import java.util.concurrent.TimeUnit;
044import java.util.concurrent.atomic.AtomicBoolean;
045import java.util.concurrent.atomic.AtomicLong;
046import java.util.concurrent.atomic.LongAdder;
047import java.util.concurrent.locks.Lock;
048import java.util.concurrent.locks.ReentrantLock;
049import java.util.concurrent.locks.ReentrantReadWriteLock;
050import java.util.function.Consumer;
051import java.util.function.Function;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.hbase.HBaseConfiguration;
054import org.apache.hadoop.hbase.TableName;
055import org.apache.hadoop.hbase.client.Admin;
056import org.apache.hadoop.hbase.io.ByteBuffAllocator;
057import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
058import org.apache.hadoop.hbase.io.HeapSize;
059import org.apache.hadoop.hbase.io.hfile.BlockCache;
060import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
061import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
062import org.apache.hadoop.hbase.io.hfile.BlockPriority;
063import org.apache.hadoop.hbase.io.hfile.BlockType;
064import org.apache.hadoop.hbase.io.hfile.CacheStats;
065import org.apache.hadoop.hbase.io.hfile.Cacheable;
066import org.apache.hadoop.hbase.io.hfile.CachedBlock;
067import org.apache.hadoop.hbase.io.hfile.HFileBlock;
068import org.apache.hadoop.hbase.io.hfile.HFileContext;
069import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor;
070import org.apache.hadoop.hbase.nio.ByteBuff;
071import org.apache.hadoop.hbase.nio.RefCnt;
072import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
073import org.apache.hadoop.hbase.util.Bytes;
074import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
075import org.apache.hadoop.hbase.util.IdReadWriteLock;
076import org.apache.hadoop.hbase.util.IdReadWriteLockStrongRef;
077import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool;
078import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool.ReferenceType;
079import org.apache.hadoop.util.StringUtils;
080import org.apache.yetus.audience.InterfaceAudience;
081import org.slf4j.Logger;
082import org.slf4j.LoggerFactory;
083
084import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
085import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
086
087import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos;
088
089/**
090 * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses BucketCache#ramCache
091 * and BucketCache#backingMap in order to determine if a given element is in the cache. The bucket
092 * cache can use off-heap memory {@link ByteBufferIOEngine} or mmap
093 * {@link ExclusiveMemoryMmapIOEngine} or pmem {@link SharedMemoryMmapIOEngine} or local files
094 * {@link FileIOEngine} to store/read the block data.
095 * <p>
096 * Eviction is via a similar algorithm as used in
097 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache}
098 * <p>
099 * BucketCache can be used as mainly a block cache (see
100 * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with a BlockCache to
101 * decrease CMS GC and heap fragmentation.
102 * <p>
103 * It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store blocks) to
104 * enlarge cache space via a victim cache.
105 */
106@InterfaceAudience.Private
107public class BucketCache implements BlockCache, HeapSize {
108  private static final Logger LOG = LoggerFactory.getLogger(BucketCache.class);
109
110  /** Priority buckets config */
111  static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor";
112  static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor";
113  static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor";
114  static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor";
115  static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor";
116  static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor";
117
118  /** Use strong reference for offsetLock or not */
119  private static final String STRONG_REF_KEY = "hbase.bucketcache.offsetlock.usestrongref";
120  private static final boolean STRONG_REF_DEFAULT = false;
121
122  /** Priority buckets */
123  static final float DEFAULT_SINGLE_FACTOR = 0.25f;
124  static final float DEFAULT_MULTI_FACTOR = 0.50f;
125  static final float DEFAULT_MEMORY_FACTOR = 0.25f;
126  static final float DEFAULT_MIN_FACTOR = 0.85f;
127
128  private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f;
129  private static final float DEFAULT_ACCEPT_FACTOR = 0.95f;
130
131  // Number of blocks to clear for each of the bucket size that is full
132  private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2;
133
134  /** Statistics thread */
135  private static final int statThreadPeriod = 5 * 60;
136
137  final static int DEFAULT_WRITER_THREADS = 3;
138  final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;
139
140  // Store/read block data
141  transient final IOEngine ioEngine;
142
143  // Store the block in this map before writing it to cache
144  transient final RAMCache ramCache;
145  // In this map, store the block's meta data like offset, length
146  transient ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMap;
147
148  /**
149   * Flag if the cache is enabled or not... We shut it off if there are IO errors for some time, so
150   * that Bucket IO exceptions/errors don't bring down the HBase server.
151   */
152  private volatile boolean cacheEnabled;
153
154  /**
155   * A list of writer queues. We have a queue per {@link WriterThread} we have running. In other
156   * words, the work adding blocks to the BucketCache is divided up amongst the running
157   * WriterThreads. Its done by taking hash of the cache key modulo queue count. WriterThread when
158   * it runs takes whatever has been recently added and 'drains' the entries to the BucketCache. It
159   * then updates the ramCache and backingMap accordingly.
160   */
161  transient final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList<>();
162  transient final WriterThread[] writerThreads;
163
164  /** Volatile boolean to track if free space is in process or not */
165  private volatile boolean freeInProgress = false;
166  private transient final Lock freeSpaceLock = new ReentrantLock();
167
168  private final LongAdder realCacheSize = new LongAdder();
169  private final LongAdder heapSize = new LongAdder();
170  /** Current number of cached elements */
171  private final LongAdder blockNumber = new LongAdder();
172
173  /** Cache access count (sequential ID) */
174  private final AtomicLong accessCount = new AtomicLong();
175
176  private static final int DEFAULT_CACHE_WAIT_TIME = 50;
177
178  private final BucketCacheStats cacheStats = new BucketCacheStats();
179
180  private final String persistencePath;
181  private final long cacheCapacity;
182  /** Approximate block size */
183  private final long blockSize;
184
185  /** Duration of IO errors tolerated before we disable cache, 1 min as default */
186  private final int ioErrorsTolerationDuration;
187  // 1 min
188  public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000;
189
190  // Start time of first IO error when reading or writing IO Engine, it will be
191  // reset after a successful read/write.
192  private volatile long ioErrorStartTime = -1;
193
194  /**
195   * A ReentrantReadWriteLock to lock on a particular block identified by offset. The purpose of
196   * this is to avoid freeing the block which is being read.
197   * <p>
198   */
199  transient final IdReadWriteLock<Long> offsetLock;
200
201  private final NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<>((a, b) -> {
202    int nameComparison = a.getHfileName().compareTo(b.getHfileName());
203    if (nameComparison != 0) {
204      return nameComparison;
205    }
206    return Long.compare(a.getOffset(), b.getOffset());
207  });
208
209  /** Statistics thread schedule pool (for heavy debugging, could remove) */
210  private transient final ScheduledExecutorService scheduleThreadPool =
211    Executors.newScheduledThreadPool(1,
212      new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build());
213
214  // Allocate or free space for the block
215  private transient BucketAllocator bucketAllocator;
216
217  /** Acceptable size of cache (no evictions if size < acceptable) */
218  private float acceptableFactor;
219
220  /** Minimum threshold of cache (when evicting, evict until size < min) */
221  private float minFactor;
222
223  /**
224   * Free this floating point factor of extra blocks when evicting. For example free the number of
225   * blocks requested * (1 + extraFreeFactor)
226   */
227  private float extraFreeFactor;
228
229  /** Single access bucket size */
230  private float singleFactor;
231
232  /** Multiple access bucket size */
233  private float multiFactor;
234
235  /** In-memory bucket size */
236  private float memoryFactor;
237
238  private String prefetchedFileListPath;
239
240  private static final String FILE_VERIFY_ALGORITHM =
241    "hbase.bucketcache.persistent.file.integrity.check.algorithm";
242  private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
243
244  private static final String QUEUE_ADDITION_WAIT_TIME =
245    "hbase.bucketcache.queue.addition.waittime";
246  private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0;
247  private long queueAdditionWaitTime;
248  /**
249   * Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file
250   * integrity, default algorithm is MD5
251   */
252  private String algorithm;
253
254  /* Tracing failed Bucket Cache allocations. */
255  private long allocFailLogPrevTs; // time of previous log event for allocation failure.
256  private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000; // Default 1 minute.
257
258  public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
259    int writerThreadNum, int writerQLen, String persistencePath) throws IOException {
260    this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
261      persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create());
262  }
263
264  public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
265    int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration,
266    Configuration conf) throws IOException {
267    boolean useStrongRef = conf.getBoolean(STRONG_REF_KEY, STRONG_REF_DEFAULT);
268    if (useStrongRef) {
269      this.offsetLock = new IdReadWriteLockStrongRef<>();
270    } else {
271      this.offsetLock = new IdReadWriteLockWithObjectPool<>(ReferenceType.SOFT);
272    }
273    this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM);
274    this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath);
275    this.writerThreads = new WriterThread[writerThreadNum];
276    long blockNumCapacity = capacity / blockSize;
277    if (blockNumCapacity >= Integer.MAX_VALUE) {
278      // Enough for about 32TB of cache!
279      throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now");
280    }
281
282    this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR);
283    this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR);
284    this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR);
285    this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR);
286    this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR);
287    this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR);
288    this.queueAdditionWaitTime =
289      conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME);
290    this.prefetchedFileListPath = conf.get(PREFETCH_PERSISTENCE_PATH_KEY);
291
292    sanityCheckConfigs();
293
294    LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor
295      + ", minFactor: " + minFactor + ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: "
296      + singleFactor + ", multiFactor: " + multiFactor + ", memoryFactor: " + memoryFactor
297      + ", useStrongRef: " + useStrongRef);
298
299    this.cacheCapacity = capacity;
300    this.persistencePath = persistencePath;
301    this.blockSize = blockSize;
302    this.ioErrorsTolerationDuration = ioErrorsTolerationDuration;
303
304    this.allocFailLogPrevTs = 0;
305
306    bucketAllocator = new BucketAllocator(capacity, bucketSizes);
307    for (int i = 0; i < writerThreads.length; ++i) {
308      writerQueues.add(new ArrayBlockingQueue<>(writerQLen));
309    }
310
311    assert writerQueues.size() == writerThreads.length;
312    this.ramCache = new RAMCache();
313
314    this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity);
315
316    if (ioEngine.isPersistent() && persistencePath != null) {
317      try {
318        retrieveFromFile(bucketSizes);
319      } catch (IOException ioex) {
320        LOG.error("Can't restore from file[" + persistencePath + "] because of ", ioex);
321      }
322    }
323    final String threadName = Thread.currentThread().getName();
324    this.cacheEnabled = true;
325    for (int i = 0; i < writerThreads.length; ++i) {
326      writerThreads[i] = new WriterThread(writerQueues.get(i));
327      writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
328      writerThreads[i].setDaemon(true);
329    }
330    startWriterThreads();
331
332    // Run the statistics thread periodically to print the cache statistics log
333    // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log
334    // every five minutes.
335    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod,
336      statThreadPeriod, TimeUnit.SECONDS);
337    LOG.info("Started bucket cache; ioengine=" + ioEngineName + ", capacity="
338      + StringUtils.byteDesc(capacity) + ", blockSize=" + StringUtils.byteDesc(blockSize)
339      + ", writerThreadNum=" + writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath="
340      + persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName());
341  }
342
343  private void sanityCheckConfigs() {
344    Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0,
345      ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
346    Preconditions.checkArgument(minFactor <= 1 && minFactor >= 0,
347      MIN_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
348    Preconditions.checkArgument(minFactor <= acceptableFactor,
349      MIN_FACTOR_CONFIG_NAME + " must be <= " + ACCEPT_FACTOR_CONFIG_NAME);
350    Preconditions.checkArgument(extraFreeFactor >= 0,
351      EXTRA_FREE_FACTOR_CONFIG_NAME + " must be greater than 0.0");
352    Preconditions.checkArgument(singleFactor <= 1 && singleFactor >= 0,
353      SINGLE_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
354    Preconditions.checkArgument(multiFactor <= 1 && multiFactor >= 0,
355      MULTI_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
356    Preconditions.checkArgument(memoryFactor <= 1 && memoryFactor >= 0,
357      MEMORY_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
358    Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1,
359      SINGLE_FACTOR_CONFIG_NAME + ", " + MULTI_FACTOR_CONFIG_NAME + ", and "
360        + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0");
361  }
362
363  /**
364   * Called by the constructor to start the writer threads. Used by tests that need to override
365   * starting the threads.
366   */
367  protected void startWriterThreads() {
368    for (WriterThread thread : writerThreads) {
369      thread.start();
370    }
371  }
372
373  boolean isCacheEnabled() {
374    return this.cacheEnabled;
375  }
376
377  @Override
378  public long getMaxSize() {
379    return this.cacheCapacity;
380  }
381
382  public String getIoEngine() {
383    return ioEngine.toString();
384  }
385
386  /**
387   * Get the IOEngine from the IO engine name
388   * @return the IOEngine
389   */
390  private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath)
391    throws IOException {
392    if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) {
393      // In order to make the usage simple, we only need the prefix 'files:' in
394      // document whether one or multiple file(s), but also support 'file:' for
395      // the compatibility
396      String[] filePaths =
397        ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER);
398      return new FileIOEngine(capacity, persistencePath != null, filePaths);
399    } else if (ioEngineName.startsWith("offheap")) {
400      return new ByteBufferIOEngine(capacity);
401    } else if (ioEngineName.startsWith("mmap:")) {
402      return new ExclusiveMemoryMmapIOEngine(ioEngineName.substring(5), capacity);
403    } else if (ioEngineName.startsWith("pmem:")) {
404      // This mode of bucket cache creates an IOEngine over a file on the persistent memory
405      // device. Since the persistent memory device has its own address space the contents
406      // mapped to this address space does not get swapped out like in the case of mmapping
407      // on to DRAM. Hence the cells created out of the hfile blocks in the pmem bucket cache
408      // can be directly referred to without having to copy them onheap. Once the RPC is done,
409      // the blocks can be returned back as in case of ByteBufferIOEngine.
410      return new SharedMemoryMmapIOEngine(ioEngineName.substring(5), capacity);
411    } else {
412      throw new IllegalArgumentException(
413        "Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap");
414    }
415  }
416
417  /**
418   * Cache the block with the specified name and buffer.
419   * @param cacheKey block's cache key
420   * @param buf      block buffer
421   */
422  @Override
423  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
424    cacheBlock(cacheKey, buf, false);
425  }
426
427  /**
428   * Cache the block with the specified name and buffer.
429   * @param cacheKey   block's cache key
430   * @param cachedItem block buffer
431   * @param inMemory   if block is in-memory
432   */
433  @Override
434  public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) {
435    cacheBlockWithWait(cacheKey, cachedItem, inMemory, false);
436  }
437
438  /**
439   * Cache the block with the specified name and buffer.
440   * @param cacheKey   block's cache key
441   * @param cachedItem block buffer
442   * @param inMemory   if block is in-memory
443   */
444  @Override
445  public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
446    boolean waitWhenCache) {
447    cacheBlockWithWait(cacheKey, cachedItem, inMemory, waitWhenCache && queueAdditionWaitTime > 0);
448  }
449
450  /**
451   * Cache the block to ramCache
452   * @param cacheKey   block's cache key
453   * @param cachedItem block buffer
454   * @param inMemory   if block is in-memory
455   * @param wait       if true, blocking wait when queue is full
456   */
457  public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
458    boolean wait) {
459    if (cacheEnabled) {
460      if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) {
461        if (shouldReplaceExistingCacheBlock(cacheKey, cachedItem)) {
462          BucketEntry bucketEntry = backingMap.get(cacheKey);
463          if (bucketEntry != null && bucketEntry.isRpcRef()) {
464            // avoid replace when there are RPC refs for the bucket entry in bucket cache
465            return;
466          }
467          cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
468        }
469      } else {
470        cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
471      }
472    }
473  }
474
475  protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) {
476    return BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, newBlock);
477  }
478
479  protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
480    boolean inMemory, boolean wait) {
481    if (!cacheEnabled) {
482      return;
483    }
484    if (cacheKey.getBlockType() == null && cachedItem.getBlockType() != null) {
485      cacheKey.setBlockType(cachedItem.getBlockType());
486    }
487    LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
488    // Stuff the entry into the RAM cache so it can get drained to the persistent store
489    RAMQueueEntry re =
490      new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
491    /**
492     * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same
493     * key in ramCache, the heap size of bucket cache need to update if replacing entry from
494     * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if
495     * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct
496     * one, then the heap size will mess up (HBASE-20789)
497     */
498    if (ramCache.putIfAbsent(cacheKey, re) != null) {
499      return;
500    }
501    int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
502    BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum);
503    boolean successfulAddition = false;
504    if (wait) {
505      try {
506        successfulAddition = bq.offer(re, queueAdditionWaitTime, TimeUnit.MILLISECONDS);
507      } catch (InterruptedException e) {
508        Thread.currentThread().interrupt();
509      }
510    } else {
511      successfulAddition = bq.offer(re);
512    }
513    if (!successfulAddition) {
514      ramCache.remove(cacheKey);
515      cacheStats.failInsert();
516    } else {
517      this.blockNumber.increment();
518      this.heapSize.add(cachedItem.heapSize());
519    }
520  }
521
522  /**
523   * Get the buffer of the block with the specified key.
524   * @param key                block's cache key
525   * @param caching            true if the caller caches blocks on cache misses
526   * @param repeat             Whether this is a repeat lookup for the same block
527   * @param updateCacheMetrics Whether we should update cache metrics or not
528   * @return buffer of specified cache key, or null if not in cache
529   */
530  @Override
531  public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
532    boolean updateCacheMetrics) {
533    if (!cacheEnabled) {
534      return null;
535    }
536    RAMQueueEntry re = ramCache.get(key);
537    if (re != null) {
538      if (updateCacheMetrics) {
539        cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
540      }
541      re.access(accessCount.incrementAndGet());
542      return re.getData();
543    }
544    BucketEntry bucketEntry = backingMap.get(key);
545    if (bucketEntry != null) {
546      long start = System.nanoTime();
547      ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
548      try {
549        lock.readLock().lock();
550        // We can not read here even if backingMap does contain the given key because its offset
551        // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check
552        // existence here.
553        if (bucketEntry.equals(backingMap.get(key))) {
554          // Read the block from IOEngine based on the bucketEntry's offset and length, NOTICE: the
555          // block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to
556          // the same BucketEntry, then all of the three will share the same refCnt.
557          Cacheable cachedBlock = ioEngine.read(bucketEntry);
558          if (ioEngine.usesSharedMemory()) {
559            // If IOEngine use shared memory, cachedBlock and BucketEntry will share the
560            // same RefCnt, do retain here, in order to count the number of RPC references
561            cachedBlock.retain();
562          }
563          // Update the cache statistics.
564          if (updateCacheMetrics) {
565            cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
566            cacheStats.ioHit(System.nanoTime() - start);
567          }
568          bucketEntry.access(accessCount.incrementAndGet());
569          if (this.ioErrorStartTime > 0) {
570            ioErrorStartTime = -1;
571          }
572          return cachedBlock;
573        }
574      } catch (IOException ioex) {
575        LOG.error("Failed reading block " + key + " from bucket cache", ioex);
576        checkIOErrorIsTolerated();
577      } finally {
578        lock.readLock().unlock();
579      }
580    }
581    if (!repeat && updateCacheMetrics) {
582      cacheStats.miss(caching, key.isPrimary(), key.getBlockType());
583    }
584    return null;
585  }
586
587  /**
588   * This method is invoked after the bucketEntry is removed from {@link BucketCache#backingMap}
589   */
590  void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber,
591    boolean evictedByEvictionProcess) {
592    bucketEntry.markAsEvicted();
593    blocksByHFile.remove(cacheKey);
594    if (decrementBlockNumber) {
595      this.blockNumber.decrement();
596    }
597    if (evictedByEvictionProcess) {
598      cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
599    }
600  }
601
602  /**
603   * Free the {{@link BucketEntry} actually,which could only be invoked when the
604   * {@link BucketEntry#refCnt} becoming 0.
605   */
606  void freeBucketEntry(BucketEntry bucketEntry) {
607    bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength());
608    realCacheSize.add(-1 * bucketEntry.getLength());
609  }
610
611  /**
612   * Try to evict the block from {@link BlockCache} by force. We'll call this in few cases:<br>
613   * 1. Close an HFile, and clear all cached blocks. <br>
614   * 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for a given table.<br>
615   * <p>
616   * Firstly, we'll try to remove the block from RAMCache,and then try to evict from backingMap.
617   * Here we evict the block from backingMap immediately, but only free the reference from bucket
618   * cache by calling {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this
619   * block, block can only be de-allocated when all of them release the block.
620   * <p>
621   * NOTICE: we need to grab the write offset lock firstly before releasing the reference from
622   * bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt = 0 when
623   * {@link BucketCache#getBlock(BlockCacheKey, boolean, boolean, boolean)}, it's a memory leak.
624   * @param cacheKey Block to evict
625   * @return true to indicate whether we've evicted successfully or not.
626   */
627  @Override
628  public boolean evictBlock(BlockCacheKey cacheKey) {
629    return doEvictBlock(cacheKey, null, false);
630  }
631
632  /**
633   * Evict the {@link BlockCacheKey} and {@link BucketEntry} from {@link BucketCache#backingMap} and
634   * {@link BucketCache#ramCache}. <br/>
635   * NOTE:When Evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and
636   * {@link BucketEntry} could be removed.
637   * @param cacheKey    {@link BlockCacheKey} to evict.
638   * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict.
639   * @return true to indicate whether we've evicted successfully or not.
640   */
641  private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry,
642    boolean evictedByEvictionProcess) {
643    if (!cacheEnabled) {
644      return false;
645    }
646    boolean existedInRamCache = removeFromRamCache(cacheKey);
647    if (bucketEntry == null) {
648      bucketEntry = backingMap.get(cacheKey);
649    }
650    final BucketEntry bucketEntryToUse = bucketEntry;
651
652    if (bucketEntryToUse == null) {
653      if (existedInRamCache && evictedByEvictionProcess) {
654        cacheStats.evicted(0, cacheKey.isPrimary());
655      }
656      return existedInRamCache;
657    } else {
658      return bucketEntryToUse.withWriteLock(offsetLock, () -> {
659        if (backingMap.remove(cacheKey, bucketEntryToUse)) {
660          blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache, evictedByEvictionProcess);
661          return true;
662        }
663        return false;
664      });
665    }
666  }
667
668  /**
669   * <pre>
670   * Create the {@link Recycler} for {@link BucketEntry#refCnt},which would be used as
671   * {@link RefCnt#recycler} of {@link HFileBlock#buf} returned from {@link BucketCache#getBlock}.
672   * NOTE: for {@link BucketCache#getBlock},the {@link RefCnt#recycler} of {@link HFileBlock#buf}
673   * from {@link BucketCache#backingMap} and {@link BucketCache#ramCache} are different:
674   * 1.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#backingMap},
675   *   it is the return value of current {@link BucketCache#createRecycler} method.
676   *
677   * 2.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#ramCache},
678   *   it is {@link ByteBuffAllocator#putbackBuffer}.
679   * </pre>
680   */
681  private Recycler createRecycler(final BucketEntry bucketEntry) {
682    return () -> {
683      freeBucketEntry(bucketEntry);
684      return;
685    };
686  }
687
688  /**
689   * NOTE: This method is only for test.
690   */
691  public boolean evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey) {
692    BucketEntry bucketEntry = backingMap.get(blockCacheKey);
693    if (bucketEntry == null) {
694      return false;
695    }
696    return evictBucketEntryIfNoRpcReferenced(blockCacheKey, bucketEntry);
697  }
698
699  /**
700   * Evict {@link BlockCacheKey} and its corresponding {@link BucketEntry} only if
701   * {@link BucketEntry#isRpcRef} is false. <br/>
702   * NOTE:When evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and
703   * {@link BucketEntry} could be removed.
704   * @param blockCacheKey {@link BlockCacheKey} to evict.
705   * @param bucketEntry   {@link BucketEntry} matched {@link BlockCacheKey} to evict.
706   * @return true to indicate whether we've evicted successfully or not.
707   */
708  boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, BucketEntry bucketEntry) {
709    if (!bucketEntry.isRpcRef()) {
710      return doEvictBlock(blockCacheKey, bucketEntry, true);
711    }
712    return false;
713  }
714
715  protected boolean removeFromRamCache(BlockCacheKey cacheKey) {
716    return ramCache.remove(cacheKey, re -> {
717      if (re != null) {
718        this.blockNumber.decrement();
719        this.heapSize.add(-1 * re.getData().heapSize());
720      }
721    });
722  }
723
724  /*
725   * Statistics thread. Periodically output cache statistics to the log.
726   */
727  private static class StatisticsThread extends Thread {
728    private final BucketCache bucketCache;
729
730    public StatisticsThread(BucketCache bucketCache) {
731      super("BucketCacheStatsThread");
732      setDaemon(true);
733      this.bucketCache = bucketCache;
734    }
735
736    @Override
737    public void run() {
738      bucketCache.logStats();
739    }
740  }
741
742  public void logStats() {
743    long totalSize = bucketAllocator.getTotalSize();
744    long usedSize = bucketAllocator.getUsedSize();
745    long freeSize = totalSize - usedSize;
746    long cacheSize = getRealCacheSize();
747    LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " + "totalSize="
748      + StringUtils.byteDesc(totalSize) + ", " + "freeSize=" + StringUtils.byteDesc(freeSize) + ", "
749      + "usedSize=" + StringUtils.byteDesc(usedSize) + ", " + "cacheSize="
750      + StringUtils.byteDesc(cacheSize) + ", " + "accesses=" + cacheStats.getRequestCount() + ", "
751      + "hits=" + cacheStats.getHitCount() + ", " + "IOhitsPerSecond="
752      + cacheStats.getIOHitsPerSecond() + ", " + "IOTimePerHit="
753      + String.format("%.2f", cacheStats.getIOTimePerHit()) + ", " + "hitRatio="
754      + (cacheStats.getHitCount() == 0
755        ? "0,"
756        : (StringUtils.formatPercent(cacheStats.getHitRatio(), 2) + ", "))
757      + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits="
758      + cacheStats.getHitCachingCount() + ", " + "cachingHitsRatio="
759      + (cacheStats.getHitCachingCount() == 0
760        ? "0,"
761        : (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2) + ", "))
762      + "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted="
763      + cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + cacheStats.evictedPerEviction()
764      + ", " + "allocationFailCount=" + cacheStats.getAllocationFailCount());
765    cacheStats.reset();
766
767    bucketAllocator.logDebugStatistics();
768  }
769
770  public long getRealCacheSize() {
771    return this.realCacheSize.sum();
772  }
773
774  public long acceptableSize() {
775    return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor);
776  }
777
778  long getPartitionSize(float partitionFactor) {
779    return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor);
780  }
781
782  /**
783   * Return the count of bucketSizeinfos still need free space
784   */
785  private int bucketSizesAboveThresholdCount(float minFactor) {
786    BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
787    int fullCount = 0;
788    for (int i = 0; i < stats.length; i++) {
789      long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor));
790      freeGoal = Math.max(freeGoal, 1);
791      if (stats[i].freeCount() < freeGoal) {
792        fullCount++;
793      }
794    }
795    return fullCount;
796  }
797
798  /**
799   * This method will find the buckets that are minimally occupied and are not reference counted and
800   * will free them completely without any constraint on the access times of the elements, and as a
801   * process will completely free at most the number of buckets passed, sometimes it might not due
802   * to changing refCounts
803   * @param completelyFreeBucketsNeeded number of buckets to free
804   **/
805  private void freeEntireBuckets(int completelyFreeBucketsNeeded) {
806    if (completelyFreeBucketsNeeded != 0) {
807      // First we will build a set where the offsets are reference counted, usually
808      // this set is small around O(Handler Count) unless something else is wrong
809      Set<Integer> inUseBuckets = new HashSet<>();
810      backingMap.forEach((k, be) -> {
811        if (be.isRpcRef()) {
812          inUseBuckets.add(bucketAllocator.getBucketIndex(be.offset()));
813        }
814      });
815      Set<Integer> candidateBuckets =
816        bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded);
817      for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) {
818        if (candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset()))) {
819          evictBucketEntryIfNoRpcReferenced(entry.getKey(), entry.getValue());
820        }
821      }
822    }
823  }
824
825  /**
826   * Free the space if the used size reaches acceptableSize() or one size block couldn't be
827   * allocated. When freeing the space, we use the LRU algorithm and ensure there must be some
828   * blocks evicted
829   * @param why Why we are being called
830   */
831  void freeSpace(final String why) {
832    // Ensure only one freeSpace progress at a time
833    if (!freeSpaceLock.tryLock()) {
834      return;
835    }
836    try {
837      freeInProgress = true;
838      long bytesToFreeWithoutExtra = 0;
839      // Calculate free byte for each bucketSizeinfo
840      StringBuilder msgBuffer = LOG.isDebugEnabled() ? new StringBuilder() : null;
841      BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
842      long[] bytesToFreeForBucket = new long[stats.length];
843      for (int i = 0; i < stats.length; i++) {
844        bytesToFreeForBucket[i] = 0;
845        long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor));
846        freeGoal = Math.max(freeGoal, 1);
847        if (stats[i].freeCount() < freeGoal) {
848          bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount());
849          bytesToFreeWithoutExtra += bytesToFreeForBucket[i];
850          if (msgBuffer != null) {
851            msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")="
852              + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", ");
853          }
854        }
855      }
856      if (msgBuffer != null) {
857        msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", ");
858      }
859
860      if (bytesToFreeWithoutExtra <= 0) {
861        return;
862      }
863      long currentSize = bucketAllocator.getUsedSize();
864      long totalSize = bucketAllocator.getTotalSize();
865      if (LOG.isDebugEnabled() && msgBuffer != null) {
866        LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString()
867          + " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize="
868          + StringUtils.byteDesc(realCacheSize.sum()) + ", total="
869          + StringUtils.byteDesc(totalSize));
870      }
871
872      long bytesToFreeWithExtra =
873        (long) Math.floor(bytesToFreeWithoutExtra * (1 + extraFreeFactor));
874
875      // Instantiate priority buckets
876      BucketEntryGroup bucketSingle =
877        new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(singleFactor));
878      BucketEntryGroup bucketMulti =
879        new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(multiFactor));
880      BucketEntryGroup bucketMemory =
881        new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(memoryFactor));
882
883      // Scan entire map putting bucket entry into appropriate bucket entry
884      // group
885      for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) {
886        switch (bucketEntryWithKey.getValue().getPriority()) {
887          case SINGLE: {
888            bucketSingle.add(bucketEntryWithKey);
889            break;
890          }
891          case MULTI: {
892            bucketMulti.add(bucketEntryWithKey);
893            break;
894          }
895          case MEMORY: {
896            bucketMemory.add(bucketEntryWithKey);
897            break;
898          }
899        }
900      }
901
902      PriorityQueue<BucketEntryGroup> bucketQueue =
903        new PriorityQueue<>(3, Comparator.comparingLong(BucketEntryGroup::overflow));
904
905      bucketQueue.add(bucketSingle);
906      bucketQueue.add(bucketMulti);
907      bucketQueue.add(bucketMemory);
908
909      int remainingBuckets = bucketQueue.size();
910      long bytesFreed = 0;
911
912      BucketEntryGroup bucketGroup;
913      while ((bucketGroup = bucketQueue.poll()) != null) {
914        long overflow = bucketGroup.overflow();
915        if (overflow > 0) {
916          long bucketBytesToFree =
917            Math.min(overflow, (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets);
918          bytesFreed += bucketGroup.free(bucketBytesToFree);
919        }
920        remainingBuckets--;
921      }
922
923      // Check and free if there are buckets that still need freeing of space
924      if (bucketSizesAboveThresholdCount(minFactor) > 0) {
925        bucketQueue.clear();
926        remainingBuckets = 3;
927
928        bucketQueue.add(bucketSingle);
929        bucketQueue.add(bucketMulti);
930        bucketQueue.add(bucketMemory);
931
932        while ((bucketGroup = bucketQueue.poll()) != null) {
933          long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets;
934          bytesFreed += bucketGroup.free(bucketBytesToFree);
935          remainingBuckets--;
936        }
937      }
938
939      // Even after the above free we might still need freeing because of the
940      // De-fragmentation of the buckets (also called Slab Calcification problem), i.e
941      // there might be some buckets where the occupancy is very sparse and thus are not
942      // yielding the free for the other bucket sizes, the fix for this to evict some
943      // of the buckets, we do this by evicting the buckets that are least fulled
944      freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR * bucketSizesAboveThresholdCount(1.0f));
945
946      if (LOG.isDebugEnabled()) {
947        long single = bucketSingle.totalSize();
948        long multi = bucketMulti.totalSize();
949        long memory = bucketMemory.totalSize();
950        if (LOG.isDebugEnabled()) {
951          LOG.debug("Bucket cache free space completed; " + "freed="
952            + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + StringUtils.byteDesc(totalSize)
953            + ", " + "single=" + StringUtils.byteDesc(single) + ", " + "multi="
954            + StringUtils.byteDesc(multi) + ", " + "memory=" + StringUtils.byteDesc(memory));
955        }
956      }
957
958    } catch (Throwable t) {
959      LOG.warn("Failed freeing space", t);
960    } finally {
961      cacheStats.evict();
962      freeInProgress = false;
963      freeSpaceLock.unlock();
964    }
965  }
966
967  // This handles flushing the RAM cache to IOEngine.
968  class WriterThread extends Thread {
969    private final BlockingQueue<RAMQueueEntry> inputQueue;
970    private volatile boolean writerEnabled = true;
971    private final ByteBuffer metaBuff = ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE);
972
973    WriterThread(BlockingQueue<RAMQueueEntry> queue) {
974      super("BucketCacheWriterThread");
975      this.inputQueue = queue;
976    }
977
978    // Used for test
979    void disableWriter() {
980      this.writerEnabled = false;
981    }
982
983    @Override
984    public void run() {
985      List<RAMQueueEntry> entries = new ArrayList<>();
986      try {
987        while (cacheEnabled && writerEnabled) {
988          try {
989            try {
990              // Blocks
991              entries = getRAMQueueEntries(inputQueue, entries);
992            } catch (InterruptedException ie) {
993              if (!cacheEnabled || !writerEnabled) {
994                break;
995              }
996            }
997            doDrain(entries, metaBuff);
998          } catch (Exception ioe) {
999            LOG.error("WriterThread encountered error", ioe);
1000          }
1001        }
1002      } catch (Throwable t) {
1003        LOG.warn("Failed doing drain", t);
1004      }
1005      LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
1006    }
1007  }
1008
1009  /**
1010   * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing
1011   * cache with a new block for the same cache key. there's a corner case: one thread cache a block
1012   * in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another new block
1013   * with the same cache key do the same thing for the same cache key, so if not evict the previous
1014   * bucket entry, then memory leak happen because the previous bucketEntry is gone but the
1015   * bucketAllocator do not free its memory.
1016   * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey
1017   *      cacheKey, Cacheable newBlock)
1018   * @param key         Block cache key
1019   * @param bucketEntry Bucket entry to put into backingMap.
1020   */
1021  protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
1022    BucketEntry previousEntry = backingMap.put(key, bucketEntry);
1023    if (previousEntry != null && previousEntry != bucketEntry) {
1024      previousEntry.withWriteLock(offsetLock, () -> {
1025        blockEvicted(key, previousEntry, false, false);
1026        return null;
1027      });
1028    }
1029  }
1030
1031  /**
1032   * Prepare and return a warning message for Bucket Allocator Exception
1033   * @param fle The exception
1034   * @param re  The RAMQueueEntry for which the exception was thrown.
1035   * @return A warning message created from the input RAMQueueEntry object.
1036   */
1037  private static String getAllocationFailWarningMessage(final BucketAllocatorException fle,
1038    final RAMQueueEntry re) {
1039    final StringBuilder sb = new StringBuilder();
1040    sb.append("Most recent failed allocation after ");
1041    sb.append(ALLOCATION_FAIL_LOG_TIME_PERIOD);
1042    sb.append(" ms;");
1043    if (re != null) {
1044      if (re.getData() instanceof HFileBlock) {
1045        final HFileContext fileContext = ((HFileBlock) re.getData()).getHFileContext();
1046        final String columnFamily = Bytes.toString(fileContext.getColumnFamily());
1047        final String tableName = Bytes.toString(fileContext.getTableName());
1048        if (tableName != null) {
1049          sb.append(" Table: ");
1050          sb.append(tableName);
1051        }
1052        if (columnFamily != null) {
1053          sb.append(" CF: ");
1054          sb.append(columnFamily);
1055        }
1056        sb.append(" HFile: ");
1057        if (fileContext.getHFileName() != null) {
1058          sb.append(fileContext.getHFileName());
1059        } else {
1060          sb.append(re.getKey());
1061        }
1062      } else {
1063        sb.append(" HFile: ");
1064        sb.append(re.getKey());
1065      }
1066    }
1067    sb.append(" Message: ");
1068    sb.append(fle.getMessage());
1069    return sb.toString();
1070  }
1071
1072  /**
1073   * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. Process all that
1074   * are passed in even if failure being sure to remove from ramCache else we'll never undo the
1075   * references and we'll OOME.
1076   * @param entries Presumes list passed in here will be processed by this invocation only. No
1077   *                interference expected.
1078   */
1079  void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException {
1080    if (entries.isEmpty()) {
1081      return;
1082    }
1083    // This method is a little hard to follow. We run through the passed in entries and for each
1084    // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must
1085    // do cleanup making sure we've cleared ramCache of all entries regardless of whether we
1086    // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by
1087    // filling ramCache. We do the clean up by again running through the passed in entries
1088    // doing extra work when we find a non-null bucketEntries corresponding entry.
1089    final int size = entries.size();
1090    BucketEntry[] bucketEntries = new BucketEntry[size];
1091    // Index updated inside loop if success or if we can't succeed. We retry if cache is full
1092    // when we go to add an entry by going around the loop again without upping the index.
1093    int index = 0;
1094    while (cacheEnabled && index < size) {
1095      RAMQueueEntry re = null;
1096      try {
1097        re = entries.get(index);
1098        if (re == null) {
1099          LOG.warn("Couldn't get entry or changed on us; who else is messing with it?");
1100          index++;
1101          continue;
1102        }
1103        BlockCacheKey cacheKey = re.getKey();
1104        if (ramCache.containsKey(cacheKey)) {
1105          blocksByHFile.add(cacheKey);
1106        }
1107        // Reset the position for reuse.
1108        // It should be guaranteed that the data in the metaBuff has been transferred to the
1109        // ioEngine safely. Otherwise, this reuse is problematic. Fortunately, the data is already
1110        // transferred with our current IOEngines. Should take care, when we have new kinds of
1111        // IOEngine in the future.
1112        metaBuff.clear();
1113        BucketEntry bucketEntry =
1114          re.writeToCache(ioEngine, bucketAllocator, realCacheSize, this::createRecycler, metaBuff);
1115        // Successfully added. Up index and add bucketEntry. Clear io exceptions.
1116        bucketEntries[index] = bucketEntry;
1117        if (ioErrorStartTime > 0) {
1118          ioErrorStartTime = -1;
1119        }
1120        index++;
1121      } catch (BucketAllocatorException fle) {
1122        long currTs = EnvironmentEdgeManager.currentTime();
1123        cacheStats.allocationFailed(); // Record the warning.
1124        if (
1125          allocFailLogPrevTs == 0 || (currTs - allocFailLogPrevTs) > ALLOCATION_FAIL_LOG_TIME_PERIOD
1126        ) {
1127          LOG.warn(getAllocationFailWarningMessage(fle, re));
1128          allocFailLogPrevTs = currTs;
1129        }
1130        // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below.
1131        bucketEntries[index] = null;
1132        index++;
1133      } catch (CacheFullException cfe) {
1134        // Cache full when we tried to add. Try freeing space and then retrying (don't up index)
1135        if (!freeInProgress) {
1136          freeSpace("Full!");
1137        } else {
1138          Thread.sleep(50);
1139        }
1140      } catch (IOException ioex) {
1141        // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem.
1142        LOG.error("Failed writing to bucket cache", ioex);
1143        checkIOErrorIsTolerated();
1144      }
1145    }
1146
1147    // Make sure data pages are written on media before we update maps.
1148    try {
1149      ioEngine.sync();
1150    } catch (IOException ioex) {
1151      LOG.error("Failed syncing IO engine", ioex);
1152      checkIOErrorIsTolerated();
1153      // Since we failed sync, free the blocks in bucket allocator
1154      for (int i = 0; i < entries.size(); ++i) {
1155        BucketEntry bucketEntry = bucketEntries[i];
1156        if (bucketEntry != null) {
1157          bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength());
1158          bucketEntries[i] = null;
1159        }
1160      }
1161    }
1162
1163    // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if
1164    // success or error.
1165    for (int i = 0; i < size; ++i) {
1166      BlockCacheKey key = entries.get(i).getKey();
1167      // Only add if non-null entry.
1168      if (bucketEntries[i] != null) {
1169        putIntoBackingMap(key, bucketEntries[i]);
1170      }
1171      // Always remove from ramCache even if we failed adding it to the block cache above.
1172      boolean existed = ramCache.remove(key, re -> {
1173        if (re != null) {
1174          heapSize.add(-1 * re.getData().heapSize());
1175        }
1176      });
1177      if (!existed && bucketEntries[i] != null) {
1178        // Block should have already been evicted. Remove it and free space.
1179        final BucketEntry bucketEntry = bucketEntries[i];
1180        bucketEntry.withWriteLock(offsetLock, () -> {
1181          if (backingMap.remove(key, bucketEntry)) {
1182            blockEvicted(key, bucketEntry, false, false);
1183          }
1184          return null;
1185        });
1186      }
1187    }
1188
1189    long used = bucketAllocator.getUsedSize();
1190    if (used > acceptableSize()) {
1191      freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
1192    }
1193    return;
1194  }
1195
1196  /**
1197   * Blocks until elements available in {@code q} then tries to grab as many as possible before
1198   * returning.
1199   * @param receptacle Where to stash the elements taken from queue. We clear before we use it just
1200   *                   in case.
1201   * @param q          The queue to take from.
1202   * @return {@code receptacle} laden with elements taken from the queue or empty if none found.
1203   */
1204  static List<RAMQueueEntry> getRAMQueueEntries(BlockingQueue<RAMQueueEntry> q,
1205    List<RAMQueueEntry> receptacle) throws InterruptedException {
1206    // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it
1207    // ok even if list grew to accommodate thousands.
1208    receptacle.clear();
1209    receptacle.add(q.take());
1210    q.drainTo(receptacle);
1211    return receptacle;
1212  }
1213
1214  /**
1215   * @see #retrieveFromFile(int[])
1216   */
1217  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION",
1218      justification = "false positive, try-with-resources ensures close is called.")
1219  private void persistToFile() throws IOException {
1220    assert !cacheEnabled;
1221    if (!ioEngine.isPersistent()) {
1222      throw new IOException("Attempt to persist non-persistent cache mappings!");
1223    }
1224    try (FileOutputStream fos = new FileOutputStream(persistencePath, false)) {
1225      fos.write(ProtobufMagic.PB_MAGIC);
1226      BucketProtoUtils.toPB(this).writeDelimitedTo(fos);
1227    }
1228    if (prefetchedFileListPath != null) {
1229      PrefetchExecutor.persistToFile(prefetchedFileListPath);
1230    }
1231  }
1232
1233  /**
1234   * @see #persistToFile()
1235   */
1236  private void retrieveFromFile(int[] bucketSizes) throws IOException {
1237    File persistenceFile = new File(persistencePath);
1238    if (!persistenceFile.exists()) {
1239      return;
1240    }
1241    assert !cacheEnabled;
1242    if (prefetchedFileListPath != null) {
1243      PrefetchExecutor.retrieveFromFile(prefetchedFileListPath);
1244    }
1245
1246    try (FileInputStream in = deleteFileOnClose(persistenceFile)) {
1247      int pblen = ProtobufMagic.lengthOfPBMagic();
1248      byte[] pbuf = new byte[pblen];
1249      int read = in.read(pbuf);
1250      if (read != pblen) {
1251        throw new IOException("Incorrect number of bytes read while checking for protobuf magic "
1252          + "number. Requested=" + pblen + ", Received= " + read + ", File=" + persistencePath);
1253      }
1254      if (!ProtobufMagic.isPBMagicPrefix(pbuf)) {
1255        // In 3.0 we have enough flexibility to dump the old cache data.
1256        // TODO: In 2.x line, this might need to be filled in to support reading the old format
1257        throw new IOException(
1258          "Persistence file does not start with protobuf magic number. " + persistencePath);
1259      }
1260      parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in));
1261      bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize);
1262      blockNumber.add(backingMap.size());
1263    }
1264  }
1265
1266  /**
1267   * Create an input stream that deletes the file after reading it. Use in try-with-resources to
1268   * avoid this pattern where an exception thrown from a finally block may mask earlier exceptions:
1269   *
1270   * <pre>
1271   *   File f = ...
1272   *   try (FileInputStream fis = new FileInputStream(f)) {
1273   *     // use the input stream
1274   *   } finally {
1275   *     if (!f.delete()) throw new IOException("failed to delete");
1276   *   }
1277   * </pre>
1278   *
1279   * @param file the file to read and delete
1280   * @return a FileInputStream for the given file
1281   * @throws IOException if there is a problem creating the stream
1282   */
1283  private FileInputStream deleteFileOnClose(final File file) throws IOException {
1284    return new FileInputStream(file) {
1285      private File myFile;
1286
1287      private FileInputStream init(File file) {
1288        myFile = file;
1289        return this;
1290      }
1291
1292      @Override
1293      public void close() throws IOException {
1294        // close() will be called during try-with-resources and it will be
1295        // called by finalizer thread during GC. To avoid double-free resource,
1296        // set myFile to null after the first call.
1297        if (myFile == null) {
1298          return;
1299        }
1300
1301        super.close();
1302        if (!myFile.delete()) {
1303          throw new IOException("Failed deleting persistence file " + myFile.getAbsolutePath());
1304        }
1305        myFile = null;
1306      }
1307    }.init(file);
1308  }
1309
1310  private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass)
1311    throws IOException {
1312    if (capacitySize != cacheCapacity) {
1313      throw new IOException("Mismatched cache capacity:" + StringUtils.byteDesc(capacitySize)
1314        + ", expected: " + StringUtils.byteDesc(cacheCapacity));
1315    }
1316    if (!ioEngine.getClass().getName().equals(ioclass)) {
1317      throw new IOException("Class name for IO engine mismatch: " + ioclass + ", expected:"
1318        + ioEngine.getClass().getName());
1319    }
1320    if (!backingMap.getClass().getName().equals(mapclass)) {
1321      throw new IOException("Class name for cache map mismatch: " + mapclass + ", expected:"
1322        + backingMap.getClass().getName());
1323    }
1324  }
1325
1326  private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException {
1327    if (proto.hasChecksum()) {
1328      ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(),
1329        algorithm);
1330    } else {
1331      // if has not checksum, it means the persistence file is old format
1332      LOG.info("Persistent file is old format, it does not support verifying file integrity!");
1333    }
1334    verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass());
1335    backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(),
1336      this::createRecycler);
1337  }
1338
1339  /**
1340   * Check whether we tolerate IO error this time. If the duration of IOEngine throwing errors
1341   * exceeds ioErrorsDurationTimeTolerated, we will disable the cache
1342   */
1343  private void checkIOErrorIsTolerated() {
1344    long now = EnvironmentEdgeManager.currentTime();
1345    // Do a single read to a local variable to avoid timing issue - HBASE-24454
1346    long ioErrorStartTimeTmp = this.ioErrorStartTime;
1347    if (ioErrorStartTimeTmp > 0) {
1348      if (cacheEnabled && (now - ioErrorStartTimeTmp) > this.ioErrorsTolerationDuration) {
1349        LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration
1350          + "ms, disabling cache, please check your IOEngine");
1351        disableCache();
1352      }
1353    } else {
1354      this.ioErrorStartTime = now;
1355    }
1356  }
1357
1358  /**
1359   * Used to shut down the cache -or- turn it off in the case of something broken.
1360   */
1361  private void disableCache() {
1362    if (!cacheEnabled) return;
1363    cacheEnabled = false;
1364    ioEngine.shutdown();
1365    this.scheduleThreadPool.shutdown();
1366    for (int i = 0; i < writerThreads.length; ++i)
1367      writerThreads[i].interrupt();
1368    this.ramCache.clear();
1369    if (!ioEngine.isPersistent() || persistencePath == null) {
1370      // If persistent ioengine and a path, we will serialize out the backingMap.
1371      this.backingMap.clear();
1372    }
1373  }
1374
1375  private void join() throws InterruptedException {
1376    for (int i = 0; i < writerThreads.length; ++i)
1377      writerThreads[i].join();
1378  }
1379
1380  @Override
1381  public void shutdown() {
1382    disableCache();
1383    LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() + "; path to write="
1384      + persistencePath);
1385    if (ioEngine.isPersistent() && persistencePath != null) {
1386      try {
1387        join();
1388        persistToFile();
1389      } catch (IOException ex) {
1390        LOG.error("Unable to persist data on exit: " + ex.toString(), ex);
1391      } catch (InterruptedException e) {
1392        LOG.warn("Failed to persist data on exit", e);
1393      }
1394    }
1395  }
1396
1397  @Override
1398  public CacheStats getStats() {
1399    return cacheStats;
1400  }
1401
1402  public BucketAllocator getAllocator() {
1403    return this.bucketAllocator;
1404  }
1405
1406  @Override
1407  public long heapSize() {
1408    return this.heapSize.sum();
1409  }
1410
1411  @Override
1412  public long size() {
1413    return this.realCacheSize.sum();
1414  }
1415
1416  @Override
1417  public long getCurrentDataSize() {
1418    return size();
1419  }
1420
1421  @Override
1422  public long getFreeSize() {
1423    return this.bucketAllocator.getFreeSize();
1424  }
1425
1426  @Override
1427  public long getBlockCount() {
1428    return this.blockNumber.sum();
1429  }
1430
1431  @Override
1432  public long getDataBlockCount() {
1433    return getBlockCount();
1434  }
1435
1436  @Override
1437  public long getCurrentSize() {
1438    return this.bucketAllocator.getUsedSize();
1439  }
1440
1441  protected String getAlgorithm() {
1442    return algorithm;
1443  }
1444
1445  /**
1446   * Evicts all blocks for a specific HFile.
1447   * <p>
1448   * This is used for evict-on-close to remove all blocks of a specific HFile.
1449   * @return the number of blocks evicted
1450   */
1451  @Override
1452  public int evictBlocksByHfileName(String hfileName) {
1453    PrefetchExecutor.removePrefetchedFileWhileEvict(hfileName);
1454    Set<BlockCacheKey> keySet = blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE),
1455      true, new BlockCacheKey(hfileName, Long.MAX_VALUE), true);
1456
1457    int numEvicted = 0;
1458    for (BlockCacheKey key : keySet) {
1459      if (evictBlock(key)) {
1460        ++numEvicted;
1461      }
1462    }
1463
1464    return numEvicted;
1465  }
1466
1467  /**
1468   * Used to group bucket entries into priority buckets. There will be a BucketEntryGroup for each
1469   * priority (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate
1470   * number of elements out of each according to configuration parameters and their relative sizes.
1471   */
1472  private class BucketEntryGroup {
1473
1474    private CachedEntryQueue queue;
1475    private long totalSize = 0;
1476    private long bucketSize;
1477
1478    public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) {
1479      this.bucketSize = bucketSize;
1480      queue = new CachedEntryQueue(bytesToFree, blockSize);
1481      totalSize = 0;
1482    }
1483
1484    public void add(Map.Entry<BlockCacheKey, BucketEntry> block) {
1485      totalSize += block.getValue().getLength();
1486      queue.add(block);
1487    }
1488
1489    public long free(long toFree) {
1490      Map.Entry<BlockCacheKey, BucketEntry> entry;
1491      long freedBytes = 0;
1492      // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free
1493      // What to do then? Caching attempt fail? Need some changes in cacheBlock API?
1494      while ((entry = queue.pollLast()) != null) {
1495        BlockCacheKey blockCacheKey = entry.getKey();
1496        BucketEntry be = entry.getValue();
1497        if (evictBucketEntryIfNoRpcReferenced(blockCacheKey, be)) {
1498          freedBytes += be.getLength();
1499        }
1500        if (freedBytes >= toFree) {
1501          return freedBytes;
1502        }
1503      }
1504      return freedBytes;
1505    }
1506
1507    public long overflow() {
1508      return totalSize - bucketSize;
1509    }
1510
1511    public long totalSize() {
1512      return totalSize;
1513    }
1514  }
1515
1516  /**
1517   * Block Entry stored in the memory with key,data and so on
1518   */
1519  static class RAMQueueEntry {
1520    private final BlockCacheKey key;
1521    private final Cacheable data;
1522    private long accessCounter;
1523    private boolean inMemory;
1524
1525    RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory) {
1526      this.key = bck;
1527      this.data = data;
1528      this.accessCounter = accessCounter;
1529      this.inMemory = inMemory;
1530    }
1531
1532    public Cacheable getData() {
1533      return data;
1534    }
1535
1536    public BlockCacheKey getKey() {
1537      return key;
1538    }
1539
1540    public void access(long accessCounter) {
1541      this.accessCounter = accessCounter;
1542    }
1543
1544    private ByteBuffAllocator getByteBuffAllocator() {
1545      if (data instanceof HFileBlock) {
1546        return ((HFileBlock) data).getByteBuffAllocator();
1547      }
1548      return ByteBuffAllocator.HEAP;
1549    }
1550
1551    public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc,
1552      final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler,
1553      ByteBuffer metaBuff) throws IOException {
1554      int len = data.getSerializedLength();
1555      // This cacheable thing can't be serialized
1556      if (len == 0) {
1557        return null;
1558      }
1559      long offset = alloc.allocateBlock(len);
1560      boolean succ = false;
1561      BucketEntry bucketEntry = null;
1562      try {
1563        bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory, createRecycler,
1564          getByteBuffAllocator());
1565        bucketEntry.setDeserializerReference(data.getDeserializer());
1566        if (data instanceof HFileBlock) {
1567          // If an instance of HFileBlock, save on some allocations.
1568          HFileBlock block = (HFileBlock) data;
1569          ByteBuff sliceBuf = block.getBufferReadOnly();
1570          block.getMetaData(metaBuff);
1571          ioEngine.write(sliceBuf, offset);
1572          ioEngine.write(metaBuff, offset + len - metaBuff.limit());
1573        } else {
1574          // Only used for testing.
1575          ByteBuffer bb = ByteBuffer.allocate(len);
1576          data.serialize(bb, true);
1577          ioEngine.write(bb, offset);
1578        }
1579        succ = true;
1580      } finally {
1581        if (!succ) {
1582          alloc.freeBlock(offset, len);
1583        }
1584      }
1585      realCacheSize.add(len);
1586      return bucketEntry;
1587    }
1588  }
1589
1590  /**
1591   * Only used in test
1592   */
1593  void stopWriterThreads() throws InterruptedException {
1594    for (WriterThread writerThread : writerThreads) {
1595      writerThread.disableWriter();
1596      writerThread.interrupt();
1597      writerThread.join();
1598    }
1599  }
1600
1601  @Override
1602  public Iterator<CachedBlock> iterator() {
1603    // Don't bother with ramcache since stuff is in here only a little while.
1604    final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i = this.backingMap.entrySet().iterator();
1605    return new Iterator<CachedBlock>() {
1606      private final long now = System.nanoTime();
1607
1608      @Override
1609      public boolean hasNext() {
1610        return i.hasNext();
1611      }
1612
1613      @Override
1614      public CachedBlock next() {
1615        final Map.Entry<BlockCacheKey, BucketEntry> e = i.next();
1616        return new CachedBlock() {
1617          @Override
1618          public String toString() {
1619            return BlockCacheUtil.toString(this, now);
1620          }
1621
1622          @Override
1623          public BlockPriority getBlockPriority() {
1624            return e.getValue().getPriority();
1625          }
1626
1627          @Override
1628          public BlockType getBlockType() {
1629            // Not held by BucketEntry. Could add it if wanted on BucketEntry creation.
1630            return null;
1631          }
1632
1633          @Override
1634          public long getOffset() {
1635            return e.getKey().getOffset();
1636          }
1637
1638          @Override
1639          public long getSize() {
1640            return e.getValue().getLength();
1641          }
1642
1643          @Override
1644          public long getCachedTime() {
1645            return e.getValue().getCachedTime();
1646          }
1647
1648          @Override
1649          public String getFilename() {
1650            return e.getKey().getHfileName();
1651          }
1652
1653          @Override
1654          public int compareTo(CachedBlock other) {
1655            int diff = this.getFilename().compareTo(other.getFilename());
1656            if (diff != 0) return diff;
1657
1658            diff = Long.compare(this.getOffset(), other.getOffset());
1659            if (diff != 0) return diff;
1660            if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
1661              throw new IllegalStateException(
1662                "" + this.getCachedTime() + ", " + other.getCachedTime());
1663            }
1664            return Long.compare(other.getCachedTime(), this.getCachedTime());
1665          }
1666
1667          @Override
1668          public int hashCode() {
1669            return e.getKey().hashCode();
1670          }
1671
1672          @Override
1673          public boolean equals(Object obj) {
1674            if (obj instanceof CachedBlock) {
1675              CachedBlock cb = (CachedBlock) obj;
1676              return compareTo(cb) == 0;
1677            } else {
1678              return false;
1679            }
1680          }
1681        };
1682      }
1683
1684      @Override
1685      public void remove() {
1686        throw new UnsupportedOperationException();
1687      }
1688    };
1689  }
1690
1691  @Override
1692  public BlockCache[] getBlockCaches() {
1693    return null;
1694  }
1695
1696  public int getRpcRefCount(BlockCacheKey cacheKey) {
1697    BucketEntry bucketEntry = backingMap.get(cacheKey);
1698    if (bucketEntry != null) {
1699      return bucketEntry.refCnt() - (bucketEntry.markedAsEvicted.get() ? 0 : 1);
1700    }
1701    return 0;
1702  }
1703
1704  float getAcceptableFactor() {
1705    return acceptableFactor;
1706  }
1707
1708  float getMinFactor() {
1709    return minFactor;
1710  }
1711
1712  float getExtraFreeFactor() {
1713    return extraFreeFactor;
1714  }
1715
1716  float getSingleFactor() {
1717    return singleFactor;
1718  }
1719
1720  float getMultiFactor() {
1721    return multiFactor;
1722  }
1723
1724  float getMemoryFactor() {
1725    return memoryFactor;
1726  }
1727
1728  /**
1729   * Wrapped the delegate ConcurrentMap with maintaining its block's reference count.
1730   */
1731  static class RAMCache {
1732    /**
1733     * Defined the map as {@link ConcurrentHashMap} explicitly here, because in
1734     * {@link RAMCache#get(BlockCacheKey)} and
1735     * {@link RAMCache#putIfAbsent(BlockCacheKey, BucketCache.RAMQueueEntry)} , we need to guarantee
1736     * the atomicity of map#computeIfPresent(key, func) and map#putIfAbsent(key, func). Besides, the
1737     * func method can execute exactly once only when the key is present(or absent) and under the
1738     * lock context. Otherwise, the reference count of block will be messed up. Notice that the
1739     * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that.
1740     */
1741    final ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> delegate = new ConcurrentHashMap<>();
1742
1743    public boolean containsKey(BlockCacheKey key) {
1744      return delegate.containsKey(key);
1745    }
1746
1747    public RAMQueueEntry get(BlockCacheKey key) {
1748      return delegate.computeIfPresent(key, (k, re) -> {
1749        // It'll be referenced by RPC, so retain atomically here. if the get and retain is not
1750        // atomic, another thread may remove and release the block, when retaining in this thread we
1751        // may retain a block with refCnt=0 which is disallowed. (see HBASE-22422)
1752        re.getData().retain();
1753        return re;
1754      });
1755    }
1756
1757    /**
1758     * Return the previous associated value, or null if absent. It has the same meaning as
1759     * {@link ConcurrentMap#putIfAbsent(Object, Object)}
1760     */
1761    public RAMQueueEntry putIfAbsent(BlockCacheKey key, RAMQueueEntry entry) {
1762      AtomicBoolean absent = new AtomicBoolean(false);
1763      RAMQueueEntry re = delegate.computeIfAbsent(key, k -> {
1764        // The RAMCache reference to this entry, so reference count should be increment.
1765        entry.getData().retain();
1766        absent.set(true);
1767        return entry;
1768      });
1769      return absent.get() ? null : re;
1770    }
1771
1772    public boolean remove(BlockCacheKey key) {
1773      return remove(key, re -> {
1774      });
1775    }
1776
1777    /**
1778     * Defined an {@link Consumer} here, because once the removed entry release its reference count,
1779     * then it's ByteBuffers may be recycled and accessing it outside this method will be thrown an
1780     * exception. the consumer will access entry to remove before release its reference count.
1781     * Notice, don't change its reference count in the {@link Consumer}
1782     */
1783    public boolean remove(BlockCacheKey key, Consumer<RAMQueueEntry> action) {
1784      RAMQueueEntry previous = delegate.remove(key);
1785      action.accept(previous);
1786      if (previous != null) {
1787        previous.getData().release();
1788      }
1789      return previous != null;
1790    }
1791
1792    public boolean isEmpty() {
1793      return delegate.isEmpty();
1794    }
1795
1796    public void clear() {
1797      Iterator<Map.Entry<BlockCacheKey, RAMQueueEntry>> it = delegate.entrySet().iterator();
1798      while (it.hasNext()) {
1799        RAMQueueEntry re = it.next().getValue();
1800        it.remove();
1801        re.getData().release();
1802      }
1803    }
1804  }
1805}