001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
021import static org.apache.hadoop.hbase.io.hfile.CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY;
022
023import java.io.File;
024import java.io.FileInputStream;
025import java.io.FileOutputStream;
026import java.io.IOException;
027import java.nio.ByteBuffer;
028import java.util.ArrayList;
029import java.util.Comparator;
030import java.util.HashSet;
031import java.util.Iterator;
032import java.util.List;
033import java.util.Map;
034import java.util.NavigableSet;
035import java.util.PriorityQueue;
036import java.util.Set;
037import java.util.concurrent.ArrayBlockingQueue;
038import java.util.concurrent.BlockingQueue;
039import java.util.concurrent.ConcurrentHashMap;
040import java.util.concurrent.ConcurrentMap;
041import java.util.concurrent.ConcurrentSkipListSet;
042import java.util.concurrent.Executors;
043import java.util.concurrent.ScheduledExecutorService;
044import java.util.concurrent.TimeUnit;
045import java.util.concurrent.atomic.AtomicBoolean;
046import java.util.concurrent.atomic.AtomicLong;
047import java.util.concurrent.atomic.LongAdder;
048import java.util.concurrent.locks.Lock;
049import java.util.concurrent.locks.ReentrantLock;
050import java.util.concurrent.locks.ReentrantReadWriteLock;
051import java.util.function.Consumer;
052import java.util.function.Function;
053import org.apache.hadoop.conf.Configuration;
054import org.apache.hadoop.hbase.HBaseConfiguration;
055import org.apache.hadoop.hbase.TableName;
056import org.apache.hadoop.hbase.client.Admin;
057import org.apache.hadoop.hbase.io.ByteBuffAllocator;
058import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
059import org.apache.hadoop.hbase.io.HeapSize;
060import org.apache.hadoop.hbase.io.hfile.BlockCache;
061import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
062import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
063import org.apache.hadoop.hbase.io.hfile.BlockPriority;
064import org.apache.hadoop.hbase.io.hfile.BlockType;
065import org.apache.hadoop.hbase.io.hfile.CacheStats;
066import org.apache.hadoop.hbase.io.hfile.Cacheable;
067import org.apache.hadoop.hbase.io.hfile.CachedBlock;
068import org.apache.hadoop.hbase.io.hfile.HFileBlock;
069import org.apache.hadoop.hbase.io.hfile.HFileContext;
070import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor;
071import org.apache.hadoop.hbase.nio.ByteBuff;
072import org.apache.hadoop.hbase.nio.RefCnt;
073import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
074import org.apache.hadoop.hbase.util.Bytes;
075import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
076import org.apache.hadoop.hbase.util.IdReadWriteLock;
077import org.apache.hadoop.hbase.util.IdReadWriteLockStrongRef;
078import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool;
079import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool.ReferenceType;
080import org.apache.hadoop.util.StringUtils;
081import org.apache.yetus.audience.InterfaceAudience;
082import org.slf4j.Logger;
083import org.slf4j.LoggerFactory;
084
085import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
086import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
087
088import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos;
089
090/**
091 * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses BucketCache#ramCache
092 * and BucketCache#backingMap in order to determine if a given element is in the cache. The bucket
093 * cache can use off-heap memory {@link ByteBufferIOEngine} or mmap
094 * {@link ExclusiveMemoryMmapIOEngine} or pmem {@link SharedMemoryMmapIOEngine} or local files
095 * {@link FileIOEngine} to store/read the block data.
096 * <p>
097 * Eviction is via a similar algorithm as used in
098 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache}
099 * <p>
100 * BucketCache can be used as mainly a block cache (see
101 * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with a BlockCache to
102 * decrease CMS GC and heap fragmentation.
103 * <p>
104 * It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store blocks) to
105 * enlarge cache space via a victim cache.
106 */
107@InterfaceAudience.Private
108public class BucketCache implements BlockCache, HeapSize {
109  private static final Logger LOG = LoggerFactory.getLogger(BucketCache.class);
110
111  /** Priority buckets config */
112  static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor";
113  static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor";
114  static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor";
115  static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor";
116  static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor";
117  static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor";
118
119  /** Use strong reference for offsetLock or not */
120  private static final String STRONG_REF_KEY = "hbase.bucketcache.offsetlock.usestrongref";
121  private static final boolean STRONG_REF_DEFAULT = false;
122
123  /** Priority buckets */
124  static final float DEFAULT_SINGLE_FACTOR = 0.25f;
125  static final float DEFAULT_MULTI_FACTOR = 0.50f;
126  static final float DEFAULT_MEMORY_FACTOR = 0.25f;
127  static final float DEFAULT_MIN_FACTOR = 0.85f;
128
129  private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f;
130  private static final float DEFAULT_ACCEPT_FACTOR = 0.95f;
131
132  // Number of blocks to clear for each of the bucket size that is full
133  private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2;
134
135  /** Statistics thread */
136  private static final int statThreadPeriod = 5 * 60;
137
138  final static int DEFAULT_WRITER_THREADS = 3;
139  final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;
140
141  // Store/read block data
142  transient final IOEngine ioEngine;
143
144  // Store the block in this map before writing it to cache
145  transient final RAMCache ramCache;
146  // In this map, store the block's meta data like offset, length
147  transient ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMap;
148
149  /**
150   * Flag if the cache is enabled or not... We shut it off if there are IO errors for some time, so
151   * that Bucket IO exceptions/errors don't bring down the HBase server.
152   */
153  private volatile boolean cacheEnabled;
154
155  /**
156   * A list of writer queues. We have a queue per {@link WriterThread} we have running. In other
157   * words, the work adding blocks to the BucketCache is divided up amongst the running
158   * WriterThreads. Its done by taking hash of the cache key modulo queue count. WriterThread when
159   * it runs takes whatever has been recently added and 'drains' the entries to the BucketCache. It
160   * then updates the ramCache and backingMap accordingly.
161   */
162  transient final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList<>();
163  transient final WriterThread[] writerThreads;
164
165  /** Volatile boolean to track if free space is in process or not */
166  private volatile boolean freeInProgress = false;
167  private transient final Lock freeSpaceLock = new ReentrantLock();
168
169  private final LongAdder realCacheSize = new LongAdder();
170  private final LongAdder heapSize = new LongAdder();
171  /** Current number of cached elements */
172  private final LongAdder blockNumber = new LongAdder();
173
174  /** Cache access count (sequential ID) */
175  private final AtomicLong accessCount = new AtomicLong();
176
177  private static final int DEFAULT_CACHE_WAIT_TIME = 50;
178
179  private final BucketCacheStats cacheStats = new BucketCacheStats();
180
181  private final String persistencePath;
182  static AtomicBoolean isCacheInconsistent = new AtomicBoolean(false);
183  private final long cacheCapacity;
184  /** Approximate block size */
185  private final long blockSize;
186
187  /** Duration of IO errors tolerated before we disable cache, 1 min as default */
188  private final int ioErrorsTolerationDuration;
189  // 1 min
190  public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000;
191
192  // Start time of first IO error when reading or writing IO Engine, it will be
193  // reset after a successful read/write.
194  private volatile long ioErrorStartTime = -1;
195
196  /**
197   * A ReentrantReadWriteLock to lock on a particular block identified by offset. The purpose of
198   * this is to avoid freeing the block which is being read.
199   * <p>
200   */
201  transient final IdReadWriteLock<Long> offsetLock;
202
203  private final NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<>((a, b) -> {
204    int nameComparison = a.getHfileName().compareTo(b.getHfileName());
205    if (nameComparison != 0) {
206      return nameComparison;
207    }
208    return Long.compare(a.getOffset(), b.getOffset());
209  });
210
211  /** Statistics thread schedule pool (for heavy debugging, could remove) */
212  private transient final ScheduledExecutorService scheduleThreadPool =
213    Executors.newScheduledThreadPool(1,
214      new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build());
215
216  // Allocate or free space for the block
217  private transient BucketAllocator bucketAllocator;
218
219  /** Acceptable size of cache (no evictions if size < acceptable) */
220  private float acceptableFactor;
221
222  /** Minimum threshold of cache (when evicting, evict until size < min) */
223  private float minFactor;
224
225  /**
226   * Free this floating point factor of extra blocks when evicting. For example free the number of
227   * blocks requested * (1 + extraFreeFactor)
228   */
229  private float extraFreeFactor;
230
231  /** Single access bucket size */
232  private float singleFactor;
233
234  /** Multiple access bucket size */
235  private float multiFactor;
236
237  /** In-memory bucket size */
238  private float memoryFactor;
239
240  private String prefetchedFileListPath;
241
242  private long bucketcachePersistInterval;
243
244  private static final String FILE_VERIFY_ALGORITHM =
245    "hbase.bucketcache.persistent.file.integrity.check.algorithm";
246  private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
247
248  private static final String QUEUE_ADDITION_WAIT_TIME =
249    "hbase.bucketcache.queue.addition.waittime";
250  private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0;
251  private long queueAdditionWaitTime;
252  /**
253   * Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file
254   * integrity, default algorithm is MD5
255   */
256  private String algorithm;
257
258  /* Tracing failed Bucket Cache allocations. */
259  private long allocFailLogPrevTs; // time of previous log event for allocation failure.
260  private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000; // Default 1 minute.
261
262  public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
263    int writerThreadNum, int writerQLen, String persistencePath) throws IOException {
264    this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
265      persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create());
266  }
267
268  public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
269    int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration,
270    Configuration conf) throws IOException {
271    boolean useStrongRef = conf.getBoolean(STRONG_REF_KEY, STRONG_REF_DEFAULT);
272    if (useStrongRef) {
273      this.offsetLock = new IdReadWriteLockStrongRef<>();
274    } else {
275      this.offsetLock = new IdReadWriteLockWithObjectPool<>(ReferenceType.SOFT);
276    }
277    this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM);
278    this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath);
279    this.writerThreads = new WriterThread[writerThreadNum];
280    long blockNumCapacity = capacity / blockSize;
281    if (blockNumCapacity >= Integer.MAX_VALUE) {
282      // Enough for about 32TB of cache!
283      throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now");
284    }
285
286    this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR);
287    this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR);
288    this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR);
289    this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR);
290    this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR);
291    this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR);
292    this.queueAdditionWaitTime =
293      conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME);
294    this.prefetchedFileListPath = conf.get(PREFETCH_PERSISTENCE_PATH_KEY);
295    this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000);
296
297    sanityCheckConfigs();
298
299    LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor
300      + ", minFactor: " + minFactor + ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: "
301      + singleFactor + ", multiFactor: " + multiFactor + ", memoryFactor: " + memoryFactor
302      + ", useStrongRef: " + useStrongRef);
303
304    this.cacheCapacity = capacity;
305    this.persistencePath = persistencePath;
306    this.blockSize = blockSize;
307    this.ioErrorsTolerationDuration = ioErrorsTolerationDuration;
308
309    this.allocFailLogPrevTs = 0;
310
311    bucketAllocator = new BucketAllocator(capacity, bucketSizes);
312    for (int i = 0; i < writerThreads.length; ++i) {
313      writerQueues.add(new ArrayBlockingQueue<>(writerQLen));
314    }
315
316    assert writerQueues.size() == writerThreads.length;
317    this.ramCache = new RAMCache();
318
319    this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity);
320
321    if (ioEngine.isPersistent() && persistencePath != null) {
322      startBucketCachePersisterThread();
323      try {
324        retrieveFromFile(bucketSizes);
325      } catch (IOException ioex) {
326        LOG.error("Can't restore from file[" + persistencePath + "] because of ", ioex);
327      }
328    }
329    final String threadName = Thread.currentThread().getName();
330    this.cacheEnabled = true;
331    for (int i = 0; i < writerThreads.length; ++i) {
332      writerThreads[i] = new WriterThread(writerQueues.get(i));
333      writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
334      writerThreads[i].setDaemon(true);
335    }
336    startWriterThreads();
337
338    // Run the statistics thread periodically to print the cache statistics log
339    // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log
340    // every five minutes.
341    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod,
342      statThreadPeriod, TimeUnit.SECONDS);
343    LOG.info("Started bucket cache; ioengine=" + ioEngineName + ", capacity="
344      + StringUtils.byteDesc(capacity) + ", blockSize=" + StringUtils.byteDesc(blockSize)
345      + ", writerThreadNum=" + writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath="
346      + persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName());
347  }
348
349  private void sanityCheckConfigs() {
350    Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0,
351      ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
352    Preconditions.checkArgument(minFactor <= 1 && minFactor >= 0,
353      MIN_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
354    Preconditions.checkArgument(minFactor <= acceptableFactor,
355      MIN_FACTOR_CONFIG_NAME + " must be <= " + ACCEPT_FACTOR_CONFIG_NAME);
356    Preconditions.checkArgument(extraFreeFactor >= 0,
357      EXTRA_FREE_FACTOR_CONFIG_NAME + " must be greater than 0.0");
358    Preconditions.checkArgument(singleFactor <= 1 && singleFactor >= 0,
359      SINGLE_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
360    Preconditions.checkArgument(multiFactor <= 1 && multiFactor >= 0,
361      MULTI_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
362    Preconditions.checkArgument(memoryFactor <= 1 && memoryFactor >= 0,
363      MEMORY_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
364    Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1,
365      SINGLE_FACTOR_CONFIG_NAME + ", " + MULTI_FACTOR_CONFIG_NAME + ", and "
366        + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0");
367  }
368
369  /**
370   * Called by the constructor to start the writer threads. Used by tests that need to override
371   * starting the threads.
372   */
373  protected void startWriterThreads() {
374    for (WriterThread thread : writerThreads) {
375      thread.start();
376    }
377  }
378
379  void startBucketCachePersisterThread() {
380    BucketCachePersister cachePersister =
381      new BucketCachePersister(this, bucketcachePersistInterval);
382    cachePersister.start();
383  }
384
385  boolean isCacheEnabled() {
386    return this.cacheEnabled;
387  }
388
389  @Override
390  public long getMaxSize() {
391    return this.cacheCapacity;
392  }
393
394  public String getIoEngine() {
395    return ioEngine.toString();
396  }
397
398  /**
399   * Get the IOEngine from the IO engine name
400   * @return the IOEngine
401   */
402  private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath)
403    throws IOException {
404    if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) {
405      // In order to make the usage simple, we only need the prefix 'files:' in
406      // document whether one or multiple file(s), but also support 'file:' for
407      // the compatibility
408      String[] filePaths =
409        ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER);
410      return new FileIOEngine(capacity, persistencePath != null, filePaths);
411    } else if (ioEngineName.startsWith("offheap")) {
412      return new ByteBufferIOEngine(capacity);
413    } else if (ioEngineName.startsWith("mmap:")) {
414      return new ExclusiveMemoryMmapIOEngine(ioEngineName.substring(5), capacity);
415    } else if (ioEngineName.startsWith("pmem:")) {
416      // This mode of bucket cache creates an IOEngine over a file on the persistent memory
417      // device. Since the persistent memory device has its own address space the contents
418      // mapped to this address space does not get swapped out like in the case of mmapping
419      // on to DRAM. Hence the cells created out of the hfile blocks in the pmem bucket cache
420      // can be directly referred to without having to copy them onheap. Once the RPC is done,
421      // the blocks can be returned back as in case of ByteBufferIOEngine.
422      return new SharedMemoryMmapIOEngine(ioEngineName.substring(5), capacity);
423    } else {
424      throw new IllegalArgumentException(
425        "Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap");
426    }
427  }
428
429  /**
430   * Cache the block with the specified name and buffer.
431   * @param cacheKey block's cache key
432   * @param buf      block buffer
433   */
434  @Override
435  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
436    cacheBlock(cacheKey, buf, false);
437  }
438
439  /**
440   * Cache the block with the specified name and buffer.
441   * @param cacheKey   block's cache key
442   * @param cachedItem block buffer
443   * @param inMemory   if block is in-memory
444   */
445  @Override
446  public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) {
447    cacheBlockWithWait(cacheKey, cachedItem, inMemory, false);
448  }
449
450  /**
451   * Cache the block with the specified name and buffer.
452   * @param cacheKey   block's cache key
453   * @param cachedItem block buffer
454   * @param inMemory   if block is in-memory
455   */
456  @Override
457  public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
458    boolean waitWhenCache) {
459    cacheBlockWithWait(cacheKey, cachedItem, inMemory, waitWhenCache && queueAdditionWaitTime > 0);
460  }
461
462  /**
463   * Cache the block to ramCache
464   * @param cacheKey   block's cache key
465   * @param cachedItem block buffer
466   * @param inMemory   if block is in-memory
467   * @param wait       if true, blocking wait when queue is full
468   */
469  public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
470    boolean wait) {
471    if (cacheEnabled) {
472      if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) {
473        if (shouldReplaceExistingCacheBlock(cacheKey, cachedItem)) {
474          BucketEntry bucketEntry = backingMap.get(cacheKey);
475          if (bucketEntry != null && bucketEntry.isRpcRef()) {
476            // avoid replace when there are RPC refs for the bucket entry in bucket cache
477            return;
478          }
479          cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
480        }
481      } else {
482        cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
483      }
484    }
485  }
486
487  protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) {
488    return BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, newBlock);
489  }
490
491  protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
492    boolean inMemory, boolean wait) {
493    if (!cacheEnabled) {
494      return;
495    }
496    if (cacheKey.getBlockType() == null && cachedItem.getBlockType() != null) {
497      cacheKey.setBlockType(cachedItem.getBlockType());
498    }
499    LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
500    // Stuff the entry into the RAM cache so it can get drained to the persistent store
501    RAMQueueEntry re =
502      new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
503    /**
504     * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same
505     * key in ramCache, the heap size of bucket cache need to update if replacing entry from
506     * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if
507     * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct
508     * one, then the heap size will mess up (HBASE-20789)
509     */
510    if (ramCache.putIfAbsent(cacheKey, re) != null) {
511      return;
512    }
513    int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
514    BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum);
515    boolean successfulAddition = false;
516    if (wait) {
517      try {
518        successfulAddition = bq.offer(re, queueAdditionWaitTime, TimeUnit.MILLISECONDS);
519      } catch (InterruptedException e) {
520        Thread.currentThread().interrupt();
521      }
522    } else {
523      successfulAddition = bq.offer(re);
524    }
525    if (!successfulAddition) {
526      ramCache.remove(cacheKey);
527      cacheStats.failInsert();
528    } else {
529      this.blockNumber.increment();
530      this.heapSize.add(cachedItem.heapSize());
531    }
532  }
533
534  /**
535   * Get the buffer of the block with the specified key.
536   * @param key                block's cache key
537   * @param caching            true if the caller caches blocks on cache misses
538   * @param repeat             Whether this is a repeat lookup for the same block
539   * @param updateCacheMetrics Whether we should update cache metrics or not
540   * @return buffer of specified cache key, or null if not in cache
541   */
542  @Override
543  public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
544    boolean updateCacheMetrics) {
545    if (!cacheEnabled) {
546      return null;
547    }
548    RAMQueueEntry re = ramCache.get(key);
549    if (re != null) {
550      if (updateCacheMetrics) {
551        cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
552      }
553      re.access(accessCount.incrementAndGet());
554      return re.getData();
555    }
556    BucketEntry bucketEntry = backingMap.get(key);
557    if (bucketEntry != null) {
558      long start = System.nanoTime();
559      ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
560      try {
561        lock.readLock().lock();
562        // We can not read here even if backingMap does contain the given key because its offset
563        // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check
564        // existence here.
565        if (bucketEntry.equals(backingMap.get(key))) {
566          // Read the block from IOEngine based on the bucketEntry's offset and length, NOTICE: the
567          // block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to
568          // the same BucketEntry, then all of the three will share the same refCnt.
569          Cacheable cachedBlock = ioEngine.read(bucketEntry);
570          if (ioEngine.usesSharedMemory()) {
571            // If IOEngine use shared memory, cachedBlock and BucketEntry will share the
572            // same RefCnt, do retain here, in order to count the number of RPC references
573            cachedBlock.retain();
574          }
575          // Update the cache statistics.
576          if (updateCacheMetrics) {
577            cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
578            cacheStats.ioHit(System.nanoTime() - start);
579          }
580          bucketEntry.access(accessCount.incrementAndGet());
581          if (this.ioErrorStartTime > 0) {
582            ioErrorStartTime = -1;
583          }
584          return cachedBlock;
585        }
586      } catch (IOException ioex) {
587        LOG.error("Failed reading block " + key + " from bucket cache", ioex);
588        checkIOErrorIsTolerated();
589      } finally {
590        lock.readLock().unlock();
591      }
592    }
593    if (!repeat && updateCacheMetrics) {
594      cacheStats.miss(caching, key.isPrimary(), key.getBlockType());
595    }
596    return null;
597  }
598
599  /**
600   * This method is invoked after the bucketEntry is removed from {@link BucketCache#backingMap}
601   */
602  void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber,
603    boolean evictedByEvictionProcess) {
604    bucketEntry.markAsEvicted();
605    blocksByHFile.remove(cacheKey);
606    if (decrementBlockNumber) {
607      this.blockNumber.decrement();
608    }
609    if (evictedByEvictionProcess) {
610      cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
611    }
612    if (ioEngine.isPersistent()) {
613      setCacheInconsistent(true);
614    }
615  }
616
617  /**
618   * Free the {{@link BucketEntry} actually,which could only be invoked when the
619   * {@link BucketEntry#refCnt} becoming 0.
620   */
621  void freeBucketEntry(BucketEntry bucketEntry) {
622    bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength());
623    realCacheSize.add(-1 * bucketEntry.getLength());
624  }
625
626  /**
627   * Try to evict the block from {@link BlockCache} by force. We'll call this in few cases:<br>
628   * 1. Close an HFile, and clear all cached blocks. <br>
629   * 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for a given table.<br>
630   * <p>
631   * Firstly, we'll try to remove the block from RAMCache,and then try to evict from backingMap.
632   * Here we evict the block from backingMap immediately, but only free the reference from bucket
633   * cache by calling {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this
634   * block, block can only be de-allocated when all of them release the block.
635   * <p>
636   * NOTICE: we need to grab the write offset lock firstly before releasing the reference from
637   * bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt = 0 when
638   * {@link BucketCache#getBlock(BlockCacheKey, boolean, boolean, boolean)}, it's a memory leak.
639   * @param cacheKey Block to evict
640   * @return true to indicate whether we've evicted successfully or not.
641   */
642  @Override
643  public boolean evictBlock(BlockCacheKey cacheKey) {
644    return doEvictBlock(cacheKey, null, false);
645  }
646
647  /**
648   * Evict the {@link BlockCacheKey} and {@link BucketEntry} from {@link BucketCache#backingMap} and
649   * {@link BucketCache#ramCache}. <br/>
650   * NOTE:When Evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and
651   * {@link BucketEntry} could be removed.
652   * @param cacheKey    {@link BlockCacheKey} to evict.
653   * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict.
654   * @return true to indicate whether we've evicted successfully or not.
655   */
656  private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry,
657    boolean evictedByEvictionProcess) {
658    if (!cacheEnabled) {
659      return false;
660    }
661    boolean existedInRamCache = removeFromRamCache(cacheKey);
662    if (bucketEntry == null) {
663      bucketEntry = backingMap.get(cacheKey);
664    }
665    final BucketEntry bucketEntryToUse = bucketEntry;
666
667    if (bucketEntryToUse == null) {
668      if (existedInRamCache && evictedByEvictionProcess) {
669        cacheStats.evicted(0, cacheKey.isPrimary());
670      }
671      return existedInRamCache;
672    } else {
673      return bucketEntryToUse.withWriteLock(offsetLock, () -> {
674        if (backingMap.remove(cacheKey, bucketEntryToUse)) {
675          blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache, evictedByEvictionProcess);
676          return true;
677        }
678        return false;
679      });
680    }
681  }
682
683  /**
684   * <pre>
685   * Create the {@link Recycler} for {@link BucketEntry#refCnt},which would be used as
686   * {@link RefCnt#recycler} of {@link HFileBlock#buf} returned from {@link BucketCache#getBlock}.
687   * NOTE: for {@link BucketCache#getBlock},the {@link RefCnt#recycler} of {@link HFileBlock#buf}
688   * from {@link BucketCache#backingMap} and {@link BucketCache#ramCache} are different:
689   * 1.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#backingMap},
690   *   it is the return value of current {@link BucketCache#createRecycler} method.
691   *
692   * 2.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#ramCache},
693   *   it is {@link ByteBuffAllocator#putbackBuffer}.
694   * </pre>
695   */
696  private Recycler createRecycler(final BucketEntry bucketEntry) {
697    return () -> {
698      freeBucketEntry(bucketEntry);
699      return;
700    };
701  }
702
703  /**
704   * NOTE: This method is only for test.
705   */
706  public boolean evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey) {
707    BucketEntry bucketEntry = backingMap.get(blockCacheKey);
708    if (bucketEntry == null) {
709      return false;
710    }
711    return evictBucketEntryIfNoRpcReferenced(blockCacheKey, bucketEntry);
712  }
713
714  /**
715   * Evict {@link BlockCacheKey} and its corresponding {@link BucketEntry} only if
716   * {@link BucketEntry#isRpcRef} is false. <br/>
717   * NOTE:When evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and
718   * {@link BucketEntry} could be removed.
719   * @param blockCacheKey {@link BlockCacheKey} to evict.
720   * @param bucketEntry   {@link BucketEntry} matched {@link BlockCacheKey} to evict.
721   * @return true to indicate whether we've evicted successfully or not.
722   */
723  boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, BucketEntry bucketEntry) {
724    if (!bucketEntry.isRpcRef()) {
725      return doEvictBlock(blockCacheKey, bucketEntry, true);
726    }
727    return false;
728  }
729
730  protected boolean removeFromRamCache(BlockCacheKey cacheKey) {
731    return ramCache.remove(cacheKey, re -> {
732      if (re != null) {
733        this.blockNumber.decrement();
734        this.heapSize.add(-1 * re.getData().heapSize());
735      }
736    });
737  }
738
739  public boolean isCacheInconsistent() {
740    return isCacheInconsistent.get();
741  }
742
743  public void setCacheInconsistent(boolean setCacheInconsistent) {
744    isCacheInconsistent.set(setCacheInconsistent);
745  }
746
747  /*
748   * Statistics thread. Periodically output cache statistics to the log.
749   */
750  private static class StatisticsThread extends Thread {
751    private final BucketCache bucketCache;
752
753    public StatisticsThread(BucketCache bucketCache) {
754      super("BucketCacheStatsThread");
755      setDaemon(true);
756      this.bucketCache = bucketCache;
757    }
758
759    @Override
760    public void run() {
761      bucketCache.logStats();
762    }
763  }
764
765  public void logStats() {
766    long totalSize = bucketAllocator.getTotalSize();
767    long usedSize = bucketAllocator.getUsedSize();
768    long freeSize = totalSize - usedSize;
769    long cacheSize = getRealCacheSize();
770    LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " + "totalSize="
771      + StringUtils.byteDesc(totalSize) + ", " + "freeSize=" + StringUtils.byteDesc(freeSize) + ", "
772      + "usedSize=" + StringUtils.byteDesc(usedSize) + ", " + "cacheSize="
773      + StringUtils.byteDesc(cacheSize) + ", " + "accesses=" + cacheStats.getRequestCount() + ", "
774      + "hits=" + cacheStats.getHitCount() + ", " + "IOhitsPerSecond="
775      + cacheStats.getIOHitsPerSecond() + ", " + "IOTimePerHit="
776      + String.format("%.2f", cacheStats.getIOTimePerHit()) + ", " + "hitRatio="
777      + (cacheStats.getHitCount() == 0
778        ? "0,"
779        : (StringUtils.formatPercent(cacheStats.getHitRatio(), 2) + ", "))
780      + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits="
781      + cacheStats.getHitCachingCount() + ", " + "cachingHitsRatio="
782      + (cacheStats.getHitCachingCount() == 0
783        ? "0,"
784        : (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2) + ", "))
785      + "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted="
786      + cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + cacheStats.evictedPerEviction()
787      + ", " + "allocationFailCount=" + cacheStats.getAllocationFailCount());
788    cacheStats.reset();
789
790    bucketAllocator.logDebugStatistics();
791  }
792
793  public long getRealCacheSize() {
794    return this.realCacheSize.sum();
795  }
796
797  public long acceptableSize() {
798    return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor);
799  }
800
801  long getPartitionSize(float partitionFactor) {
802    return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor);
803  }
804
805  /**
806   * Return the count of bucketSizeinfos still need free space
807   */
808  private int bucketSizesAboveThresholdCount(float minFactor) {
809    BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
810    int fullCount = 0;
811    for (int i = 0; i < stats.length; i++) {
812      long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor));
813      freeGoal = Math.max(freeGoal, 1);
814      if (stats[i].freeCount() < freeGoal) {
815        fullCount++;
816      }
817    }
818    return fullCount;
819  }
820
821  /**
822   * This method will find the buckets that are minimally occupied and are not reference counted and
823   * will free them completely without any constraint on the access times of the elements, and as a
824   * process will completely free at most the number of buckets passed, sometimes it might not due
825   * to changing refCounts
826   * @param completelyFreeBucketsNeeded number of buckets to free
827   **/
828  private void freeEntireBuckets(int completelyFreeBucketsNeeded) {
829    if (completelyFreeBucketsNeeded != 0) {
830      // First we will build a set where the offsets are reference counted, usually
831      // this set is small around O(Handler Count) unless something else is wrong
832      Set<Integer> inUseBuckets = new HashSet<>();
833      backingMap.forEach((k, be) -> {
834        if (be.isRpcRef()) {
835          inUseBuckets.add(bucketAllocator.getBucketIndex(be.offset()));
836        }
837      });
838      Set<Integer> candidateBuckets =
839        bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded);
840      for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) {
841        if (candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset()))) {
842          evictBucketEntryIfNoRpcReferenced(entry.getKey(), entry.getValue());
843        }
844      }
845    }
846  }
847
848  /**
849   * Free the space if the used size reaches acceptableSize() or one size block couldn't be
850   * allocated. When freeing the space, we use the LRU algorithm and ensure there must be some
851   * blocks evicted
852   * @param why Why we are being called
853   */
854  void freeSpace(final String why) {
855    // Ensure only one freeSpace progress at a time
856    if (!freeSpaceLock.tryLock()) {
857      return;
858    }
859    try {
860      freeInProgress = true;
861      long bytesToFreeWithoutExtra = 0;
862      // Calculate free byte for each bucketSizeinfo
863      StringBuilder msgBuffer = LOG.isDebugEnabled() ? new StringBuilder() : null;
864      BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
865      long[] bytesToFreeForBucket = new long[stats.length];
866      for (int i = 0; i < stats.length; i++) {
867        bytesToFreeForBucket[i] = 0;
868        long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor));
869        freeGoal = Math.max(freeGoal, 1);
870        if (stats[i].freeCount() < freeGoal) {
871          bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount());
872          bytesToFreeWithoutExtra += bytesToFreeForBucket[i];
873          if (msgBuffer != null) {
874            msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")="
875              + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", ");
876          }
877        }
878      }
879      if (msgBuffer != null) {
880        msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", ");
881      }
882
883      if (bytesToFreeWithoutExtra <= 0) {
884        return;
885      }
886      long currentSize = bucketAllocator.getUsedSize();
887      long totalSize = bucketAllocator.getTotalSize();
888      if (LOG.isDebugEnabled() && msgBuffer != null) {
889        LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString()
890          + " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize="
891          + StringUtils.byteDesc(realCacheSize.sum()) + ", total="
892          + StringUtils.byteDesc(totalSize));
893      }
894
895      long bytesToFreeWithExtra =
896        (long) Math.floor(bytesToFreeWithoutExtra * (1 + extraFreeFactor));
897
898      // Instantiate priority buckets
899      BucketEntryGroup bucketSingle =
900        new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(singleFactor));
901      BucketEntryGroup bucketMulti =
902        new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(multiFactor));
903      BucketEntryGroup bucketMemory =
904        new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(memoryFactor));
905
906      // Scan entire map putting bucket entry into appropriate bucket entry
907      // group
908      for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) {
909        switch (bucketEntryWithKey.getValue().getPriority()) {
910          case SINGLE: {
911            bucketSingle.add(bucketEntryWithKey);
912            break;
913          }
914          case MULTI: {
915            bucketMulti.add(bucketEntryWithKey);
916            break;
917          }
918          case MEMORY: {
919            bucketMemory.add(bucketEntryWithKey);
920            break;
921          }
922        }
923      }
924
925      PriorityQueue<BucketEntryGroup> bucketQueue =
926        new PriorityQueue<>(3, Comparator.comparingLong(BucketEntryGroup::overflow));
927
928      bucketQueue.add(bucketSingle);
929      bucketQueue.add(bucketMulti);
930      bucketQueue.add(bucketMemory);
931
932      int remainingBuckets = bucketQueue.size();
933      long bytesFreed = 0;
934
935      BucketEntryGroup bucketGroup;
936      while ((bucketGroup = bucketQueue.poll()) != null) {
937        long overflow = bucketGroup.overflow();
938        if (overflow > 0) {
939          long bucketBytesToFree =
940            Math.min(overflow, (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets);
941          bytesFreed += bucketGroup.free(bucketBytesToFree);
942        }
943        remainingBuckets--;
944      }
945
946      // Check and free if there are buckets that still need freeing of space
947      if (bucketSizesAboveThresholdCount(minFactor) > 0) {
948        bucketQueue.clear();
949        remainingBuckets = 3;
950
951        bucketQueue.add(bucketSingle);
952        bucketQueue.add(bucketMulti);
953        bucketQueue.add(bucketMemory);
954
955        while ((bucketGroup = bucketQueue.poll()) != null) {
956          long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets;
957          bytesFreed += bucketGroup.free(bucketBytesToFree);
958          remainingBuckets--;
959        }
960      }
961
962      // Even after the above free we might still need freeing because of the
963      // De-fragmentation of the buckets (also called Slab Calcification problem), i.e
964      // there might be some buckets where the occupancy is very sparse and thus are not
965      // yielding the free for the other bucket sizes, the fix for this to evict some
966      // of the buckets, we do this by evicting the buckets that are least fulled
967      freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR * bucketSizesAboveThresholdCount(1.0f));
968
969      if (LOG.isDebugEnabled()) {
970        long single = bucketSingle.totalSize();
971        long multi = bucketMulti.totalSize();
972        long memory = bucketMemory.totalSize();
973        if (LOG.isDebugEnabled()) {
974          LOG.debug("Bucket cache free space completed; " + "freed="
975            + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + StringUtils.byteDesc(totalSize)
976            + ", " + "single=" + StringUtils.byteDesc(single) + ", " + "multi="
977            + StringUtils.byteDesc(multi) + ", " + "memory=" + StringUtils.byteDesc(memory));
978        }
979      }
980
981    } catch (Throwable t) {
982      LOG.warn("Failed freeing space", t);
983    } finally {
984      cacheStats.evict();
985      freeInProgress = false;
986      freeSpaceLock.unlock();
987    }
988  }
989
990  // This handles flushing the RAM cache to IOEngine.
991  class WriterThread extends Thread {
992    private final BlockingQueue<RAMQueueEntry> inputQueue;
993    private volatile boolean writerEnabled = true;
994    private final ByteBuffer metaBuff = ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE);
995
996    WriterThread(BlockingQueue<RAMQueueEntry> queue) {
997      super("BucketCacheWriterThread");
998      this.inputQueue = queue;
999    }
1000
1001    // Used for test
1002    void disableWriter() {
1003      this.writerEnabled = false;
1004    }
1005
1006    @Override
1007    public void run() {
1008      List<RAMQueueEntry> entries = new ArrayList<>();
1009      try {
1010        while (cacheEnabled && writerEnabled) {
1011          try {
1012            try {
1013              // Blocks
1014              entries = getRAMQueueEntries(inputQueue, entries);
1015            } catch (InterruptedException ie) {
1016              if (!cacheEnabled || !writerEnabled) {
1017                break;
1018              }
1019            }
1020            doDrain(entries, metaBuff);
1021          } catch (Exception ioe) {
1022            LOG.error("WriterThread encountered error", ioe);
1023          }
1024        }
1025      } catch (Throwable t) {
1026        LOG.warn("Failed doing drain", t);
1027      }
1028      LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
1029    }
1030  }
1031
1032  /**
1033   * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing
1034   * cache with a new block for the same cache key. there's a corner case: one thread cache a block
1035   * in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another new block
1036   * with the same cache key do the same thing for the same cache key, so if not evict the previous
1037   * bucket entry, then memory leak happen because the previous bucketEntry is gone but the
1038   * bucketAllocator do not free its memory.
1039   * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey
1040   *      cacheKey, Cacheable newBlock)
1041   * @param key         Block cache key
1042   * @param bucketEntry Bucket entry to put into backingMap.
1043   */
1044  protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
1045    BucketEntry previousEntry = backingMap.put(key, bucketEntry);
1046    if (previousEntry != null && previousEntry != bucketEntry) {
1047      previousEntry.withWriteLock(offsetLock, () -> {
1048        blockEvicted(key, previousEntry, false, false);
1049        return null;
1050      });
1051    }
1052  }
1053
1054  /**
1055   * Prepare and return a warning message for Bucket Allocator Exception
1056   * @param fle The exception
1057   * @param re  The RAMQueueEntry for which the exception was thrown.
1058   * @return A warning message created from the input RAMQueueEntry object.
1059   */
1060  private static String getAllocationFailWarningMessage(final BucketAllocatorException fle,
1061    final RAMQueueEntry re) {
1062    final StringBuilder sb = new StringBuilder();
1063    sb.append("Most recent failed allocation after ");
1064    sb.append(ALLOCATION_FAIL_LOG_TIME_PERIOD);
1065    sb.append(" ms;");
1066    if (re != null) {
1067      if (re.getData() instanceof HFileBlock) {
1068        final HFileContext fileContext = ((HFileBlock) re.getData()).getHFileContext();
1069        final String columnFamily = Bytes.toString(fileContext.getColumnFamily());
1070        final String tableName = Bytes.toString(fileContext.getTableName());
1071        if (tableName != null) {
1072          sb.append(" Table: ");
1073          sb.append(tableName);
1074        }
1075        if (columnFamily != null) {
1076          sb.append(" CF: ");
1077          sb.append(columnFamily);
1078        }
1079        sb.append(" HFile: ");
1080        if (fileContext.getHFileName() != null) {
1081          sb.append(fileContext.getHFileName());
1082        } else {
1083          sb.append(re.getKey());
1084        }
1085      } else {
1086        sb.append(" HFile: ");
1087        sb.append(re.getKey());
1088      }
1089    }
1090    sb.append(" Message: ");
1091    sb.append(fle.getMessage());
1092    return sb.toString();
1093  }
1094
1095  /**
1096   * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. Process all that
1097   * are passed in even if failure being sure to remove from ramCache else we'll never undo the
1098   * references and we'll OOME.
1099   * @param entries Presumes list passed in here will be processed by this invocation only. No
1100   *                interference expected.
1101   */
1102  void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException {
1103    if (entries.isEmpty()) {
1104      return;
1105    }
1106    // This method is a little hard to follow. We run through the passed in entries and for each
1107    // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must
1108    // do cleanup making sure we've cleared ramCache of all entries regardless of whether we
1109    // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by
1110    // filling ramCache. We do the clean up by again running through the passed in entries
1111    // doing extra work when we find a non-null bucketEntries corresponding entry.
1112    final int size = entries.size();
1113    BucketEntry[] bucketEntries = new BucketEntry[size];
1114    // Index updated inside loop if success or if we can't succeed. We retry if cache is full
1115    // when we go to add an entry by going around the loop again without upping the index.
1116    int index = 0;
1117    while (cacheEnabled && index < size) {
1118      RAMQueueEntry re = null;
1119      try {
1120        re = entries.get(index);
1121        if (re == null) {
1122          LOG.warn("Couldn't get entry or changed on us; who else is messing with it?");
1123          index++;
1124          continue;
1125        }
1126        BlockCacheKey cacheKey = re.getKey();
1127        if (ramCache.containsKey(cacheKey)) {
1128          blocksByHFile.add(cacheKey);
1129        }
1130        // Reset the position for reuse.
1131        // It should be guaranteed that the data in the metaBuff has been transferred to the
1132        // ioEngine safely. Otherwise, this reuse is problematic. Fortunately, the data is already
1133        // transferred with our current IOEngines. Should take care, when we have new kinds of
1134        // IOEngine in the future.
1135        metaBuff.clear();
1136        BucketEntry bucketEntry =
1137          re.writeToCache(ioEngine, bucketAllocator, realCacheSize, this::createRecycler, metaBuff);
1138        // Successfully added. Up index and add bucketEntry. Clear io exceptions.
1139        bucketEntries[index] = bucketEntry;
1140        if (ioErrorStartTime > 0) {
1141          ioErrorStartTime = -1;
1142        }
1143        index++;
1144      } catch (BucketAllocatorException fle) {
1145        long currTs = EnvironmentEdgeManager.currentTime();
1146        cacheStats.allocationFailed(); // Record the warning.
1147        if (
1148          allocFailLogPrevTs == 0 || (currTs - allocFailLogPrevTs) > ALLOCATION_FAIL_LOG_TIME_PERIOD
1149        ) {
1150          LOG.warn(getAllocationFailWarningMessage(fle, re));
1151          allocFailLogPrevTs = currTs;
1152        }
1153        // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below.
1154        bucketEntries[index] = null;
1155        index++;
1156      } catch (CacheFullException cfe) {
1157        // Cache full when we tried to add. Try freeing space and then retrying (don't up index)
1158        if (!freeInProgress) {
1159          freeSpace("Full!");
1160        } else {
1161          Thread.sleep(50);
1162        }
1163      } catch (IOException ioex) {
1164        // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem.
1165        LOG.error("Failed writing to bucket cache", ioex);
1166        checkIOErrorIsTolerated();
1167      }
1168    }
1169
1170    // Make sure data pages are written on media before we update maps.
1171    try {
1172      ioEngine.sync();
1173    } catch (IOException ioex) {
1174      LOG.error("Failed syncing IO engine", ioex);
1175      checkIOErrorIsTolerated();
1176      // Since we failed sync, free the blocks in bucket allocator
1177      for (int i = 0; i < entries.size(); ++i) {
1178        BucketEntry bucketEntry = bucketEntries[i];
1179        if (bucketEntry != null) {
1180          bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength());
1181          bucketEntries[i] = null;
1182        }
1183      }
1184    }
1185
1186    // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if
1187    // success or error.
1188    for (int i = 0; i < size; ++i) {
1189      BlockCacheKey key = entries.get(i).getKey();
1190      // Only add if non-null entry.
1191      if (bucketEntries[i] != null) {
1192        putIntoBackingMap(key, bucketEntries[i]);
1193        if (ioEngine.isPersistent()) {
1194          setCacheInconsistent(true);
1195        }
1196      }
1197      // Always remove from ramCache even if we failed adding it to the block cache above.
1198      boolean existed = ramCache.remove(key, re -> {
1199        if (re != null) {
1200          heapSize.add(-1 * re.getData().heapSize());
1201        }
1202      });
1203      if (!existed && bucketEntries[i] != null) {
1204        // Block should have already been evicted. Remove it and free space.
1205        final BucketEntry bucketEntry = bucketEntries[i];
1206        bucketEntry.withWriteLock(offsetLock, () -> {
1207          if (backingMap.remove(key, bucketEntry)) {
1208            blockEvicted(key, bucketEntry, false, false);
1209          }
1210          return null;
1211        });
1212      }
1213    }
1214
1215    long used = bucketAllocator.getUsedSize();
1216    if (used > acceptableSize()) {
1217      freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
1218    }
1219    return;
1220  }
1221
1222  /**
1223   * Blocks until elements available in {@code q} then tries to grab as many as possible before
1224   * returning.
1225   * @param receptacle Where to stash the elements taken from queue. We clear before we use it just
1226   *                   in case.
1227   * @param q          The queue to take from.
1228   * @return {@code receptacle} laden with elements taken from the queue or empty if none found.
1229   */
1230  static List<RAMQueueEntry> getRAMQueueEntries(BlockingQueue<RAMQueueEntry> q,
1231    List<RAMQueueEntry> receptacle) throws InterruptedException {
1232    // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it
1233    // ok even if list grew to accommodate thousands.
1234    receptacle.clear();
1235    receptacle.add(q.take());
1236    q.drainTo(receptacle);
1237    return receptacle;
1238  }
1239
1240  /**
1241   * @see #retrieveFromFile(int[])
1242   */
1243  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION",
1244      justification = "false positive, try-with-resources ensures close is called.")
1245  void persistToFile() throws IOException {
1246    if (!ioEngine.isPersistent()) {
1247      throw new IOException("Attempt to persist non-persistent cache mappings!");
1248    }
1249    try (FileOutputStream fos = new FileOutputStream(persistencePath, false)) {
1250      fos.write(ProtobufMagic.PB_MAGIC);
1251      BucketProtoUtils.toPB(this).writeDelimitedTo(fos);
1252    }
1253    if (prefetchedFileListPath != null) {
1254      PrefetchExecutor.persistToFile(prefetchedFileListPath);
1255    }
1256  }
1257
1258  /**
1259   * @see #persistToFile()
1260   */
1261  private void retrieveFromFile(int[] bucketSizes) throws IOException {
1262    File persistenceFile = new File(persistencePath);
1263    if (!persistenceFile.exists()) {
1264      return;
1265    }
1266    assert !cacheEnabled;
1267    if (prefetchedFileListPath != null) {
1268      PrefetchExecutor.retrieveFromFile(prefetchedFileListPath);
1269    }
1270
1271    try (FileInputStream in = deleteFileOnClose(persistenceFile)) {
1272      int pblen = ProtobufMagic.lengthOfPBMagic();
1273      byte[] pbuf = new byte[pblen];
1274      int read = in.read(pbuf);
1275      if (read != pblen) {
1276        throw new IOException("Incorrect number of bytes read while checking for protobuf magic "
1277          + "number. Requested=" + pblen + ", Received= " + read + ", File=" + persistencePath);
1278      }
1279      if (!ProtobufMagic.isPBMagicPrefix(pbuf)) {
1280        // In 3.0 we have enough flexibility to dump the old cache data.
1281        // TODO: In 2.x line, this might need to be filled in to support reading the old format
1282        throw new IOException(
1283          "Persistence file does not start with protobuf magic number. " + persistencePath);
1284      }
1285      parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in));
1286      bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize);
1287      blockNumber.add(backingMap.size());
1288    }
1289  }
1290
1291  /**
1292   * Create an input stream that deletes the file after reading it. Use in try-with-resources to
1293   * avoid this pattern where an exception thrown from a finally block may mask earlier exceptions:
1294   *
1295   * <pre>
1296   *   File f = ...
1297   *   try (FileInputStream fis = new FileInputStream(f)) {
1298   *     // use the input stream
1299   *   } finally {
1300   *     if (!f.delete()) throw new IOException("failed to delete");
1301   *   }
1302   * </pre>
1303   *
1304   * @param file the file to read and delete
1305   * @return a FileInputStream for the given file
1306   * @throws IOException if there is a problem creating the stream
1307   */
1308  private FileInputStream deleteFileOnClose(final File file) throws IOException {
1309    return new FileInputStream(file) {
1310      private File myFile;
1311
1312      private FileInputStream init(File file) {
1313        myFile = file;
1314        return this;
1315      }
1316
1317      @Override
1318      public void close() throws IOException {
1319        // close() will be called during try-with-resources and it will be
1320        // called by finalizer thread during GC. To avoid double-free resource,
1321        // set myFile to null after the first call.
1322        if (myFile == null) {
1323          return;
1324        }
1325
1326        super.close();
1327        if (!myFile.delete()) {
1328          throw new IOException("Failed deleting persistence file " + myFile.getAbsolutePath());
1329        }
1330        myFile = null;
1331      }
1332    }.init(file);
1333  }
1334
1335  private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass)
1336    throws IOException {
1337    if (capacitySize != cacheCapacity) {
1338      throw new IOException("Mismatched cache capacity:" + StringUtils.byteDesc(capacitySize)
1339        + ", expected: " + StringUtils.byteDesc(cacheCapacity));
1340    }
1341    if (!ioEngine.getClass().getName().equals(ioclass)) {
1342      throw new IOException("Class name for IO engine mismatch: " + ioclass + ", expected:"
1343        + ioEngine.getClass().getName());
1344    }
1345    if (!backingMap.getClass().getName().equals(mapclass)) {
1346      throw new IOException("Class name for cache map mismatch: " + mapclass + ", expected:"
1347        + backingMap.getClass().getName());
1348    }
1349  }
1350
1351  private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException {
1352    if (proto.hasChecksum()) {
1353      ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(),
1354        algorithm);
1355    } else {
1356      // if has not checksum, it means the persistence file is old format
1357      LOG.info("Persistent file is old format, it does not support verifying file integrity!");
1358    }
1359    verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass());
1360    backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(),
1361      this::createRecycler);
1362  }
1363
1364  /**
1365   * Check whether we tolerate IO error this time. If the duration of IOEngine throwing errors
1366   * exceeds ioErrorsDurationTimeTolerated, we will disable the cache
1367   */
1368  private void checkIOErrorIsTolerated() {
1369    long now = EnvironmentEdgeManager.currentTime();
1370    // Do a single read to a local variable to avoid timing issue - HBASE-24454
1371    long ioErrorStartTimeTmp = this.ioErrorStartTime;
1372    if (ioErrorStartTimeTmp > 0) {
1373      if (cacheEnabled && (now - ioErrorStartTimeTmp) > this.ioErrorsTolerationDuration) {
1374        LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration
1375          + "ms, disabling cache, please check your IOEngine");
1376        disableCache();
1377      }
1378    } else {
1379      this.ioErrorStartTime = now;
1380    }
1381  }
1382
1383  /**
1384   * Used to shut down the cache -or- turn it off in the case of something broken.
1385   */
1386  private void disableCache() {
1387    if (!cacheEnabled) return;
1388    cacheEnabled = false;
1389    ioEngine.shutdown();
1390    this.scheduleThreadPool.shutdown();
1391    for (int i = 0; i < writerThreads.length; ++i)
1392      writerThreads[i].interrupt();
1393    this.ramCache.clear();
1394    if (!ioEngine.isPersistent() || persistencePath == null) {
1395      // If persistent ioengine and a path, we will serialize out the backingMap.
1396      this.backingMap.clear();
1397    }
1398  }
1399
1400  private void join() throws InterruptedException {
1401    for (int i = 0; i < writerThreads.length; ++i)
1402      writerThreads[i].join();
1403  }
1404
1405  @Override
1406  public void shutdown() {
1407    disableCache();
1408    LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() + "; path to write="
1409      + persistencePath);
1410    if (ioEngine.isPersistent() && persistencePath != null) {
1411      try {
1412        join();
1413        persistToFile();
1414      } catch (IOException ex) {
1415        LOG.error("Unable to persist data on exit: " + ex.toString(), ex);
1416      } catch (InterruptedException e) {
1417        LOG.warn("Failed to persist data on exit", e);
1418      }
1419    }
1420  }
1421
1422  @Override
1423  public CacheStats getStats() {
1424    return cacheStats;
1425  }
1426
1427  public BucketAllocator getAllocator() {
1428    return this.bucketAllocator;
1429  }
1430
1431  @Override
1432  public long heapSize() {
1433    return this.heapSize.sum();
1434  }
1435
1436  @Override
1437  public long size() {
1438    return this.realCacheSize.sum();
1439  }
1440
1441  @Override
1442  public long getCurrentDataSize() {
1443    return size();
1444  }
1445
1446  @Override
1447  public long getFreeSize() {
1448    return this.bucketAllocator.getFreeSize();
1449  }
1450
1451  @Override
1452  public long getBlockCount() {
1453    return this.blockNumber.sum();
1454  }
1455
1456  @Override
1457  public long getDataBlockCount() {
1458    return getBlockCount();
1459  }
1460
1461  @Override
1462  public long getCurrentSize() {
1463    return this.bucketAllocator.getUsedSize();
1464  }
1465
1466  protected String getAlgorithm() {
1467    return algorithm;
1468  }
1469
1470  /**
1471   * Evicts all blocks for a specific HFile.
1472   * <p>
1473   * This is used for evict-on-close to remove all blocks of a specific HFile.
1474   * @return the number of blocks evicted
1475   */
1476  @Override
1477  public int evictBlocksByHfileName(String hfileName) {
1478    PrefetchExecutor.removePrefetchedFileWhileEvict(hfileName);
1479    Set<BlockCacheKey> keySet = blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE),
1480      true, new BlockCacheKey(hfileName, Long.MAX_VALUE), true);
1481
1482    int numEvicted = 0;
1483    for (BlockCacheKey key : keySet) {
1484      if (evictBlock(key)) {
1485        ++numEvicted;
1486      }
1487    }
1488
1489    return numEvicted;
1490  }
1491
1492  /**
1493   * Used to group bucket entries into priority buckets. There will be a BucketEntryGroup for each
1494   * priority (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate
1495   * number of elements out of each according to configuration parameters and their relative sizes.
1496   */
1497  private class BucketEntryGroup {
1498
1499    private CachedEntryQueue queue;
1500    private long totalSize = 0;
1501    private long bucketSize;
1502
1503    public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) {
1504      this.bucketSize = bucketSize;
1505      queue = new CachedEntryQueue(bytesToFree, blockSize);
1506      totalSize = 0;
1507    }
1508
1509    public void add(Map.Entry<BlockCacheKey, BucketEntry> block) {
1510      totalSize += block.getValue().getLength();
1511      queue.add(block);
1512    }
1513
1514    public long free(long toFree) {
1515      Map.Entry<BlockCacheKey, BucketEntry> entry;
1516      long freedBytes = 0;
1517      // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free
1518      // What to do then? Caching attempt fail? Need some changes in cacheBlock API?
1519      while ((entry = queue.pollLast()) != null) {
1520        BlockCacheKey blockCacheKey = entry.getKey();
1521        BucketEntry be = entry.getValue();
1522        if (evictBucketEntryIfNoRpcReferenced(blockCacheKey, be)) {
1523          freedBytes += be.getLength();
1524        }
1525        if (freedBytes >= toFree) {
1526          return freedBytes;
1527        }
1528      }
1529      return freedBytes;
1530    }
1531
1532    public long overflow() {
1533      return totalSize - bucketSize;
1534    }
1535
1536    public long totalSize() {
1537      return totalSize;
1538    }
1539  }
1540
1541  /**
1542   * Block Entry stored in the memory with key,data and so on
1543   */
1544  static class RAMQueueEntry {
1545    private final BlockCacheKey key;
1546    private final Cacheable data;
1547    private long accessCounter;
1548    private boolean inMemory;
1549
1550    RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory) {
1551      this.key = bck;
1552      this.data = data;
1553      this.accessCounter = accessCounter;
1554      this.inMemory = inMemory;
1555    }
1556
1557    public Cacheable getData() {
1558      return data;
1559    }
1560
1561    public BlockCacheKey getKey() {
1562      return key;
1563    }
1564
1565    public void access(long accessCounter) {
1566      this.accessCounter = accessCounter;
1567    }
1568
1569    private ByteBuffAllocator getByteBuffAllocator() {
1570      if (data instanceof HFileBlock) {
1571        return ((HFileBlock) data).getByteBuffAllocator();
1572      }
1573      return ByteBuffAllocator.HEAP;
1574    }
1575
1576    public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc,
1577      final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler,
1578      ByteBuffer metaBuff) throws IOException {
1579      int len = data.getSerializedLength();
1580      // This cacheable thing can't be serialized
1581      if (len == 0) {
1582        return null;
1583      }
1584      long offset = alloc.allocateBlock(len);
1585      boolean succ = false;
1586      BucketEntry bucketEntry = null;
1587      try {
1588        bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory, createRecycler,
1589          getByteBuffAllocator());
1590        bucketEntry.setDeserializerReference(data.getDeserializer());
1591        if (data instanceof HFileBlock) {
1592          // If an instance of HFileBlock, save on some allocations.
1593          HFileBlock block = (HFileBlock) data;
1594          ByteBuff sliceBuf = block.getBufferReadOnly();
1595          block.getMetaData(metaBuff);
1596          ioEngine.write(sliceBuf, offset);
1597          ioEngine.write(metaBuff, offset + len - metaBuff.limit());
1598        } else {
1599          // Only used for testing.
1600          ByteBuffer bb = ByteBuffer.allocate(len);
1601          data.serialize(bb, true);
1602          ioEngine.write(bb, offset);
1603        }
1604        succ = true;
1605      } finally {
1606        if (!succ) {
1607          alloc.freeBlock(offset, len);
1608        }
1609      }
1610      realCacheSize.add(len);
1611      return bucketEntry;
1612    }
1613  }
1614
1615  /**
1616   * Only used in test
1617   */
1618  void stopWriterThreads() throws InterruptedException {
1619    for (WriterThread writerThread : writerThreads) {
1620      writerThread.disableWriter();
1621      writerThread.interrupt();
1622      writerThread.join();
1623    }
1624  }
1625
1626  @Override
1627  public Iterator<CachedBlock> iterator() {
1628    // Don't bother with ramcache since stuff is in here only a little while.
1629    final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i = this.backingMap.entrySet().iterator();
1630    return new Iterator<CachedBlock>() {
1631      private final long now = System.nanoTime();
1632
1633      @Override
1634      public boolean hasNext() {
1635        return i.hasNext();
1636      }
1637
1638      @Override
1639      public CachedBlock next() {
1640        final Map.Entry<BlockCacheKey, BucketEntry> e = i.next();
1641        return new CachedBlock() {
1642          @Override
1643          public String toString() {
1644            return BlockCacheUtil.toString(this, now);
1645          }
1646
1647          @Override
1648          public BlockPriority getBlockPriority() {
1649            return e.getValue().getPriority();
1650          }
1651
1652          @Override
1653          public BlockType getBlockType() {
1654            // Not held by BucketEntry. Could add it if wanted on BucketEntry creation.
1655            return null;
1656          }
1657
1658          @Override
1659          public long getOffset() {
1660            return e.getKey().getOffset();
1661          }
1662
1663          @Override
1664          public long getSize() {
1665            return e.getValue().getLength();
1666          }
1667
1668          @Override
1669          public long getCachedTime() {
1670            return e.getValue().getCachedTime();
1671          }
1672
1673          @Override
1674          public String getFilename() {
1675            return e.getKey().getHfileName();
1676          }
1677
1678          @Override
1679          public int compareTo(CachedBlock other) {
1680            int diff = this.getFilename().compareTo(other.getFilename());
1681            if (diff != 0) return diff;
1682
1683            diff = Long.compare(this.getOffset(), other.getOffset());
1684            if (diff != 0) return diff;
1685            if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
1686              throw new IllegalStateException(
1687                "" + this.getCachedTime() + ", " + other.getCachedTime());
1688            }
1689            return Long.compare(other.getCachedTime(), this.getCachedTime());
1690          }
1691
1692          @Override
1693          public int hashCode() {
1694            return e.getKey().hashCode();
1695          }
1696
1697          @Override
1698          public boolean equals(Object obj) {
1699            if (obj instanceof CachedBlock) {
1700              CachedBlock cb = (CachedBlock) obj;
1701              return compareTo(cb) == 0;
1702            } else {
1703              return false;
1704            }
1705          }
1706        };
1707      }
1708
1709      @Override
1710      public void remove() {
1711        throw new UnsupportedOperationException();
1712      }
1713    };
1714  }
1715
1716  @Override
1717  public BlockCache[] getBlockCaches() {
1718    return null;
1719  }
1720
1721  public int getRpcRefCount(BlockCacheKey cacheKey) {
1722    BucketEntry bucketEntry = backingMap.get(cacheKey);
1723    if (bucketEntry != null) {
1724      return bucketEntry.refCnt() - (bucketEntry.markedAsEvicted.get() ? 0 : 1);
1725    }
1726    return 0;
1727  }
1728
1729  float getAcceptableFactor() {
1730    return acceptableFactor;
1731  }
1732
1733  float getMinFactor() {
1734    return minFactor;
1735  }
1736
1737  float getExtraFreeFactor() {
1738    return extraFreeFactor;
1739  }
1740
1741  float getSingleFactor() {
1742    return singleFactor;
1743  }
1744
1745  float getMultiFactor() {
1746    return multiFactor;
1747  }
1748
1749  float getMemoryFactor() {
1750    return memoryFactor;
1751  }
1752
1753  /**
1754   * Wrapped the delegate ConcurrentMap with maintaining its block's reference count.
1755   */
1756  static class RAMCache {
1757    /**
1758     * Defined the map as {@link ConcurrentHashMap} explicitly here, because in
1759     * {@link RAMCache#get(BlockCacheKey)} and
1760     * {@link RAMCache#putIfAbsent(BlockCacheKey, BucketCache.RAMQueueEntry)} , we need to guarantee
1761     * the atomicity of map#computeIfPresent(key, func) and map#putIfAbsent(key, func). Besides, the
1762     * func method can execute exactly once only when the key is present(or absent) and under the
1763     * lock context. Otherwise, the reference count of block will be messed up. Notice that the
1764     * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that.
1765     */
1766    final ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> delegate = new ConcurrentHashMap<>();
1767
1768    public boolean containsKey(BlockCacheKey key) {
1769      return delegate.containsKey(key);
1770    }
1771
1772    public RAMQueueEntry get(BlockCacheKey key) {
1773      return delegate.computeIfPresent(key, (k, re) -> {
1774        // It'll be referenced by RPC, so retain atomically here. if the get and retain is not
1775        // atomic, another thread may remove and release the block, when retaining in this thread we
1776        // may retain a block with refCnt=0 which is disallowed. (see HBASE-22422)
1777        re.getData().retain();
1778        return re;
1779      });
1780    }
1781
1782    /**
1783     * Return the previous associated value, or null if absent. It has the same meaning as
1784     * {@link ConcurrentMap#putIfAbsent(Object, Object)}
1785     */
1786    public RAMQueueEntry putIfAbsent(BlockCacheKey key, RAMQueueEntry entry) {
1787      AtomicBoolean absent = new AtomicBoolean(false);
1788      RAMQueueEntry re = delegate.computeIfAbsent(key, k -> {
1789        // The RAMCache reference to this entry, so reference count should be increment.
1790        entry.getData().retain();
1791        absent.set(true);
1792        return entry;
1793      });
1794      return absent.get() ? null : re;
1795    }
1796
1797    public boolean remove(BlockCacheKey key) {
1798      return remove(key, re -> {
1799      });
1800    }
1801
1802    /**
1803     * Defined an {@link Consumer} here, because once the removed entry release its reference count,
1804     * then it's ByteBuffers may be recycled and accessing it outside this method will be thrown an
1805     * exception. the consumer will access entry to remove before release its reference count.
1806     * Notice, don't change its reference count in the {@link Consumer}
1807     */
1808    public boolean remove(BlockCacheKey key, Consumer<RAMQueueEntry> action) {
1809      RAMQueueEntry previous = delegate.remove(key);
1810      action.accept(previous);
1811      if (previous != null) {
1812        previous.getData().release();
1813      }
1814      return previous != null;
1815    }
1816
1817    public boolean isEmpty() {
1818      return delegate.isEmpty();
1819    }
1820
1821    public void clear() {
1822      Iterator<Map.Entry<BlockCacheKey, RAMQueueEntry>> it = delegate.entrySet().iterator();
1823      while (it.hasNext()) {
1824        RAMQueueEntry re = it.next().getValue();
1825        it.remove();
1826        re.getData().release();
1827      }
1828    }
1829  }
1830}