001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BLOCKCACHE_STATS_PERIODS;
021import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BLOCKCACHE_STATS_PERIOD_MINUTES_KEY;
022import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.DEFAULT_BLOCKCACHE_STATS_PERIODS;
023import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.DEFAULT_BLOCKCACHE_STATS_PERIOD_MINUTES;
024import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
025
026import java.io.File;
027import java.io.FileInputStream;
028import java.io.FileOutputStream;
029import java.io.IOException;
030import java.nio.ByteBuffer;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.Collections;
034import java.util.Comparator;
035import java.util.HashSet;
036import java.util.Iterator;
037import java.util.List;
038import java.util.Map;
039import java.util.NavigableSet;
040import java.util.Optional;
041import java.util.PriorityQueue;
042import java.util.Set;
043import java.util.concurrent.ArrayBlockingQueue;
044import java.util.concurrent.BlockingQueue;
045import java.util.concurrent.ConcurrentHashMap;
046import java.util.concurrent.ConcurrentMap;
047import java.util.concurrent.ConcurrentSkipListSet;
048import java.util.concurrent.Executors;
049import java.util.concurrent.ScheduledExecutorService;
050import java.util.concurrent.TimeUnit;
051import java.util.concurrent.atomic.AtomicBoolean;
052import java.util.concurrent.atomic.AtomicLong;
053import java.util.concurrent.atomic.LongAdder;
054import java.util.concurrent.locks.Lock;
055import java.util.concurrent.locks.ReentrantLock;
056import java.util.concurrent.locks.ReentrantReadWriteLock;
057import java.util.function.Consumer;
058import java.util.function.Function;
059import org.apache.commons.io.IOUtils;
060import org.apache.commons.lang3.mutable.MutableInt;
061import org.apache.hadoop.conf.Configuration;
062import org.apache.hadoop.fs.Path;
063import org.apache.hadoop.hbase.HBaseConfiguration;
064import org.apache.hadoop.hbase.HBaseIOException;
065import org.apache.hadoop.hbase.TableName;
066import org.apache.hadoop.hbase.client.Admin;
067import org.apache.hadoop.hbase.io.ByteBuffAllocator;
068import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
069import org.apache.hadoop.hbase.io.HeapSize;
070import org.apache.hadoop.hbase.io.hfile.BlockCache;
071import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
072import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
073import org.apache.hadoop.hbase.io.hfile.BlockPriority;
074import org.apache.hadoop.hbase.io.hfile.BlockType;
075import org.apache.hadoop.hbase.io.hfile.CacheConfig;
076import org.apache.hadoop.hbase.io.hfile.CacheStats;
077import org.apache.hadoop.hbase.io.hfile.Cacheable;
078import org.apache.hadoop.hbase.io.hfile.CachedBlock;
079import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
080import org.apache.hadoop.hbase.io.hfile.HFileBlock;
081import org.apache.hadoop.hbase.io.hfile.HFileContext;
082import org.apache.hadoop.hbase.io.hfile.HFileInfo;
083import org.apache.hadoop.hbase.nio.ByteBuff;
084import org.apache.hadoop.hbase.nio.RefCnt;
085import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
086import org.apache.hadoop.hbase.regionserver.DataTieringManager;
087import org.apache.hadoop.hbase.regionserver.HRegion;
088import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
089import org.apache.hadoop.hbase.util.Bytes;
090import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
091import org.apache.hadoop.hbase.util.HFileArchiveUtil;
092import org.apache.hadoop.hbase.util.IdReadWriteLock;
093import org.apache.hadoop.hbase.util.IdReadWriteLockStrongRef;
094import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool;
095import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool.ReferenceType;
096import org.apache.hadoop.hbase.util.Pair;
097import org.apache.hadoop.util.StringUtils;
098import org.apache.yetus.audience.InterfaceAudience;
099import org.slf4j.Logger;
100import org.slf4j.LoggerFactory;
101
102import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
103import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
104
105import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos;
106
107/**
108 * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses BucketCache#ramCache
109 * and BucketCache#backingMap in order to determine if a given element is in the cache. The bucket
110 * cache can use off-heap memory {@link ByteBufferIOEngine} or mmap
111 * {@link ExclusiveMemoryMmapIOEngine} or pmem {@link SharedMemoryMmapIOEngine} or local files
112 * {@link FileIOEngine} to store/read the block data.
113 * <p>
114 * Eviction is via a similar algorithm as used in
115 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache}
116 * <p>
117 * BucketCache can be used as mainly a block cache (see
118 * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with a BlockCache to
119 * decrease CMS GC and heap fragmentation.
120 * <p>
121 * It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store blocks) to
122 * enlarge cache space via a victim cache.
123 */
124@InterfaceAudience.Private
125public class BucketCache implements BlockCache, HeapSize {
126  private static final Logger LOG = LoggerFactory.getLogger(BucketCache.class);
127
128  /** Priority buckets config */
129  static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor";
130  static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor";
131  static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor";
132  static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor";
133  static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor";
134  static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor";
135  static final String BACKING_MAP_PERSISTENCE_CHUNK_SIZE =
136    "hbase.bucketcache.persistence.chunksize";
137
138  /** Use strong reference for offsetLock or not */
139  private static final String STRONG_REF_KEY = "hbase.bucketcache.offsetlock.usestrongref";
140  private static final boolean STRONG_REF_DEFAULT = false;
141
142  /** The cache age of blocks to check if the related file is present on any online regions. */
143  static final String BLOCK_ORPHAN_GRACE_PERIOD =
144    "hbase.bucketcache.block.orphan.evictgraceperiod.seconds";
145
146  static final long BLOCK_ORPHAN_GRACE_PERIOD_DEFAULT = 24 * 60 * 60 * 1000L;
147
148  /** Priority buckets */
149  static final float DEFAULT_SINGLE_FACTOR = 0.25f;
150  static final float DEFAULT_MULTI_FACTOR = 0.50f;
151  static final float DEFAULT_MEMORY_FACTOR = 0.25f;
152  static final float DEFAULT_MIN_FACTOR = 0.85f;
153
154  static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f;
155  static final float DEFAULT_ACCEPT_FACTOR = 0.95f;
156
157  // Number of blocks to clear for each of the bucket size that is full
158  static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2;
159
160  /** Statistics thread */
161  private static final int statThreadPeriod = 5 * 60;
162
163  final static int DEFAULT_WRITER_THREADS = 3;
164  final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;
165
166  final static long DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE = 10000;
167
168  // Store/read block data
169  transient final IOEngine ioEngine;
170
171  // Store the block in this map before writing it to cache
172  transient final RAMCache ramCache;
173
174  // In this map, store the block's meta data like offset, length
175  transient Map<BlockCacheKey, BucketEntry> backingMap;
176
177  private AtomicBoolean backingMapValidated = new AtomicBoolean(false);
178
179  /**
180   * Map of hFile -> Region -> File size. This map is used to track all files completed prefetch,
181   * together with the region those belong to and the total cached size for the
182   * region.TestBlockEvictionOnRegionMovement
183   */
184  transient final Map<String, Pair<String, Long>> fullyCachedFiles = new ConcurrentHashMap<>();
185  /**
186   * Map of region -> total size of the region prefetched on this region server. This is the total
187   * size of hFiles for this region prefetched on this region server
188   */
189  final Map<String, Long> regionCachedSize = new ConcurrentHashMap<>();
190
191  private transient BucketCachePersister cachePersister;
192
193  /**
194   * Enum to represent the state of cache
195   */
196  protected enum CacheState {
197    // Initializing: State when the cache is being initialised from persistence.
198    INITIALIZING,
199    // Enabled: State when cache is initialised and is ready.
200    ENABLED,
201    // Disabled: State when the cache is disabled.
202    DISABLED
203  }
204
205  /**
206   * Flag if the cache is enabled or not... We shut it off if there are IO errors for some time, so
207   * that Bucket IO exceptions/errors don't bring down the HBase server.
208   */
209  private volatile CacheState cacheState;
210
211  /**
212   * A list of writer queues. We have a queue per {@link WriterThread} we have running. In other
213   * words, the work adding blocks to the BucketCache is divided up amongst the running
214   * WriterThreads. Its done by taking hash of the cache key modulo queue count. WriterThread when
215   * it runs takes whatever has been recently added and 'drains' the entries to the BucketCache. It
216   * then updates the ramCache and backingMap accordingly.
217   */
218  transient final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList<>();
219  transient final WriterThread[] writerThreads;
220
221  /** Volatile boolean to track if free space is in process or not */
222  private volatile boolean freeInProgress = false;
223  private transient final Lock freeSpaceLock = new ReentrantLock();
224
225  private final LongAdder realCacheSize = new LongAdder();
226  private final LongAdder heapSize = new LongAdder();
227  /** Current number of cached elements */
228  private final LongAdder blockNumber = new LongAdder();
229
230  /** Cache access count (sequential ID) */
231  private final AtomicLong accessCount = new AtomicLong();
232
233  private static final int DEFAULT_CACHE_WAIT_TIME = 50;
234
235  private final BucketCacheStats cacheStats;
236  private final String persistencePath;
237  static AtomicBoolean isCacheInconsistent = new AtomicBoolean(false);
238  private final long cacheCapacity;
239  /** Approximate block size */
240  private final long blockSize;
241
242  /** Duration of IO errors tolerated before we disable cache, 1 min as default */
243  private final int ioErrorsTolerationDuration;
244  // 1 min
245  public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000;
246
247  // Start time of first IO error when reading or writing IO Engine, it will be
248  // reset after a successful read/write.
249  private volatile long ioErrorStartTime = -1;
250
251  private transient Configuration conf;
252
253  /**
254   * A ReentrantReadWriteLock to lock on a particular block identified by offset. The purpose of
255   * this is to avoid freeing the block which is being read.
256   * <p>
257   */
258  transient final IdReadWriteLock<Long> offsetLock;
259
260  transient NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<>(
261    Comparator.comparing(BlockCacheKey::getHfileName).thenComparingLong(BlockCacheKey::getOffset));
262
263  /** Statistics thread schedule pool (for heavy debugging, could remove) */
264  private transient final ScheduledExecutorService scheduleThreadPool =
265    Executors.newScheduledThreadPool(1,
266      new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build());
267
268  // Allocate or free space for the block
269  private transient BucketAllocator bucketAllocator;
270
271  /** Acceptable size of cache (no evictions if size < acceptable) */
272  private float acceptableFactor;
273
274  /** Minimum threshold of cache (when evicting, evict until size < min) */
275  private float minFactor;
276
277  /**
278   * Free this floating point factor of extra blocks when evicting. For example free the number of
279   * blocks requested * (1 + extraFreeFactor)
280   */
281  private float extraFreeFactor;
282
283  /** Single access bucket size */
284  private float singleFactor;
285
286  /** Multiple access bucket size */
287  private float multiFactor;
288
289  /** In-memory bucket size */
290  private float memoryFactor;
291
292  private long bucketcachePersistInterval;
293
294  private static final String FILE_VERIFY_ALGORITHM =
295    "hbase.bucketcache.persistent.file.integrity.check.algorithm";
296  private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
297
298  static final String QUEUE_ADDITION_WAIT_TIME = "hbase.bucketcache.queue.addition.waittime";
299  private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0;
300  private long queueAdditionWaitTime;
301  /**
302   * Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file
303   * integrity, default algorithm is MD5
304   */
305  private String algorithm;
306
307  private long persistenceChunkSize;
308
309  /* Tracing failed Bucket Cache allocations. */
310  private long allocFailLogPrevTs; // time of previous log event for allocation failure.
311  private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000; // Default 1 minute.
312
313  private transient Map<String, HRegion> onlineRegions;
314
315  private long orphanBlockGracePeriod = 0;
316
317  public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
318    int writerThreadNum, int writerQLen, String persistencePath) throws IOException {
319    this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
320      persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create());
321  }
322
323  public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
324    int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration,
325    Configuration conf) throws IOException {
326    this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
327      persistencePath, ioErrorsTolerationDuration, conf, null);
328  }
329
330  public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
331    int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration,
332    Configuration conf, Map<String, HRegion> onlineRegions) throws IOException {
333    Preconditions.checkArgument(blockSize > 0,
334      "BucketCache capacity is set to " + blockSize + ", can not be less than 0");
335    boolean useStrongRef = conf.getBoolean(STRONG_REF_KEY, STRONG_REF_DEFAULT);
336    if (useStrongRef) {
337      this.offsetLock = new IdReadWriteLockStrongRef<>();
338    } else {
339      this.offsetLock = new IdReadWriteLockWithObjectPool<>(ReferenceType.SOFT);
340    }
341    this.conf = conf;
342    this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM);
343    this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath);
344    this.writerThreads = new WriterThread[writerThreadNum];
345    this.onlineRegions = onlineRegions;
346    this.orphanBlockGracePeriod =
347      conf.getLong(BLOCK_ORPHAN_GRACE_PERIOD, BLOCK_ORPHAN_GRACE_PERIOD_DEFAULT);
348    long blockNumCapacity = capacity / blockSize;
349    if (blockNumCapacity >= Integer.MAX_VALUE) {
350      // Enough for about 32TB of cache!
351      throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now");
352    }
353
354    // these sets the dynamic configs
355    this.onConfigurationChange(conf);
356    this.cacheStats =
357      new BucketCacheStats(conf.getInt(BLOCKCACHE_STATS_PERIODS, DEFAULT_BLOCKCACHE_STATS_PERIODS),
358        conf.getInt(BLOCKCACHE_STATS_PERIOD_MINUTES_KEY, DEFAULT_BLOCKCACHE_STATS_PERIOD_MINUTES));
359
360    LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor
361      + ", minFactor: " + minFactor + ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: "
362      + singleFactor + ", multiFactor: " + multiFactor + ", memoryFactor: " + memoryFactor
363      + ", useStrongRef: " + useStrongRef);
364
365    this.cacheCapacity = capacity;
366    this.persistencePath = persistencePath;
367    this.blockSize = blockSize;
368    this.ioErrorsTolerationDuration = ioErrorsTolerationDuration;
369    this.cacheState = CacheState.INITIALIZING;
370
371    this.allocFailLogPrevTs = 0;
372
373    for (int i = 0; i < writerThreads.length; ++i) {
374      writerQueues.add(new ArrayBlockingQueue<>(writerQLen));
375    }
376
377    assert writerQueues.size() == writerThreads.length;
378    this.ramCache = new RAMCache();
379
380    this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity);
381    instantiateWriterThreads();
382
383    if (isCachePersistent()) {
384      if (ioEngine instanceof FileIOEngine) {
385        startBucketCachePersisterThread();
386      }
387      startPersistenceRetriever(bucketSizes, capacity);
388    } else {
389      bucketAllocator = new BucketAllocator(capacity, bucketSizes);
390      this.cacheState = CacheState.ENABLED;
391      startWriterThreads();
392    }
393
394    // Run the statistics thread periodically to print the cache statistics log
395    // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log
396    // every five minutes.
397    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod,
398      statThreadPeriod, TimeUnit.SECONDS);
399    LOG.info("Started bucket cache; ioengine=" + ioEngineName + ", capacity="
400      + StringUtils.byteDesc(capacity) + ", blockSize=" + StringUtils.byteDesc(blockSize)
401      + ", writerThreadNum=" + writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath="
402      + persistencePath + ", bucketAllocator=" + BucketAllocator.class.getName());
403  }
404
405  private void startPersistenceRetriever(int[] bucketSizes, long capacity) {
406    Runnable persistentCacheRetriever = () -> {
407      try {
408        retrieveFromFile(bucketSizes);
409        LOG.info("Persistent bucket cache recovery from {} is complete.", persistencePath);
410      } catch (Throwable ex) {
411        LOG.warn("Can't restore from file[{}]. The bucket cache will be reset and rebuilt."
412          + " Exception seen: ", persistencePath, ex);
413        backingMap.clear();
414        fullyCachedFiles.clear();
415        backingMapValidated.set(true);
416        regionCachedSize.clear();
417        try {
418          bucketAllocator = new BucketAllocator(capacity, bucketSizes);
419        } catch (BucketAllocatorException allocatorException) {
420          LOG.error("Exception during Bucket Allocation", allocatorException);
421        }
422      } finally {
423        this.cacheState = CacheState.ENABLED;
424        startWriterThreads();
425      }
426    };
427    Thread t = new Thread(persistentCacheRetriever);
428    t.start();
429  }
430
431  private void sanityCheckConfigs() {
432    Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0,
433      ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
434    Preconditions.checkArgument(minFactor <= 1 && minFactor >= 0,
435      MIN_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
436    Preconditions.checkArgument(minFactor <= acceptableFactor,
437      MIN_FACTOR_CONFIG_NAME + " must be <= " + ACCEPT_FACTOR_CONFIG_NAME);
438    Preconditions.checkArgument(extraFreeFactor >= 0,
439      EXTRA_FREE_FACTOR_CONFIG_NAME + " must be greater than 0.0");
440    Preconditions.checkArgument(singleFactor <= 1 && singleFactor >= 0,
441      SINGLE_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
442    Preconditions.checkArgument(multiFactor <= 1 && multiFactor >= 0,
443      MULTI_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
444    Preconditions.checkArgument(memoryFactor <= 1 && memoryFactor >= 0,
445      MEMORY_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
446    Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1,
447      SINGLE_FACTOR_CONFIG_NAME + ", " + MULTI_FACTOR_CONFIG_NAME + ", and "
448        + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0");
449    if (this.persistenceChunkSize <= 0) {
450      persistenceChunkSize = DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE;
451    }
452  }
453
454  /**
455   * Called by the constructor to instantiate the writer threads.
456   */
457  private void instantiateWriterThreads() {
458    final String threadName = Thread.currentThread().getName();
459    for (int i = 0; i < this.writerThreads.length; ++i) {
460      this.writerThreads[i] = new WriterThread(this.writerQueues.get(i));
461      this.writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
462      this.writerThreads[i].setDaemon(true);
463    }
464  }
465
466  /**
467   * Called by the constructor to start the writer threads. Used by tests that need to override
468   * starting the threads.
469   */
470  protected void startWriterThreads() {
471    for (WriterThread thread : writerThreads) {
472      thread.start();
473    }
474  }
475
476  void startBucketCachePersisterThread() {
477    LOG.info("Starting BucketCachePersisterThread");
478    cachePersister = new BucketCachePersister(this, bucketcachePersistInterval);
479    cachePersister.setDaemon(true);
480    cachePersister.start();
481  }
482
483  @Override
484  public boolean isCacheEnabled() {
485    return this.cacheState == CacheState.ENABLED;
486  }
487
488  @Override
489  public long getMaxSize() {
490    return this.cacheCapacity;
491  }
492
493  public String getIoEngine() {
494    return ioEngine.toString();
495  }
496
497  /**
498   * Get the IOEngine from the IO engine name
499   * @return the IOEngine
500   */
501  private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath)
502    throws IOException {
503    if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) {
504      // In order to make the usage simple, we only need the prefix 'files:' in
505      // document whether one or multiple file(s), but also support 'file:' for
506      // the compatibility
507      String[] filePaths =
508        ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER);
509      return new FileIOEngine(capacity, persistencePath != null, filePaths);
510    } else if (ioEngineName.startsWith("offheap")) {
511      return new ByteBufferIOEngine(capacity);
512    } else if (ioEngineName.startsWith("mmap:")) {
513      return new ExclusiveMemoryMmapIOEngine(ioEngineName.substring(5), capacity);
514    } else if (ioEngineName.startsWith("pmem:")) {
515      // This mode of bucket cache creates an IOEngine over a file on the persistent memory
516      // device. Since the persistent memory device has its own address space the contents
517      // mapped to this address space does not get swapped out like in the case of mmapping
518      // on to DRAM. Hence the cells created out of the hfile blocks in the pmem bucket cache
519      // can be directly referred to without having to copy them onheap. Once the RPC is done,
520      // the blocks can be returned back as in case of ByteBufferIOEngine.
521      return new SharedMemoryMmapIOEngine(ioEngineName.substring(5), capacity);
522    } else {
523      throw new IllegalArgumentException(
524        "Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap");
525    }
526  }
527
528  public boolean isCachePersistenceEnabled() {
529    return persistencePath != null;
530  }
531
532  /**
533   * Cache the block with the specified name and buffer.
534   * @param cacheKey block's cache key
535   * @param buf      block buffer
536   */
537  @Override
538  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
539    cacheBlock(cacheKey, buf, false);
540  }
541
542  /**
543   * Cache the block with the specified name and buffer.
544   * @param cacheKey   block's cache key
545   * @param cachedItem block buffer
546   * @param inMemory   if block is in-memory
547   */
548  @Override
549  public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) {
550    cacheBlockWithWait(cacheKey, cachedItem, inMemory, false);
551  }
552
553  /**
554   * Cache the block with the specified name and buffer.
555   * @param cacheKey   block's cache key
556   * @param cachedItem block buffer
557   * @param inMemory   if block is in-memory
558   */
559  @Override
560  public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
561    boolean waitWhenCache) {
562    cacheBlockWithWait(cacheKey, cachedItem, inMemory, waitWhenCache && queueAdditionWaitTime > 0);
563  }
564
565  /**
566   * Cache the block to ramCache
567   * @param cacheKey   block's cache key
568   * @param cachedItem block buffer
569   * @param inMemory   if block is in-memory
570   * @param wait       if true, blocking wait when queue is full
571   */
572  public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
573    boolean wait) {
574    if (isCacheEnabled()) {
575      if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) {
576        if (shouldReplaceExistingCacheBlock(cacheKey, cachedItem)) {
577          BucketEntry bucketEntry = backingMap.get(cacheKey);
578          if (bucketEntry != null && bucketEntry.isRpcRef()) {
579            // avoid replace when there are RPC refs for the bucket entry in bucket cache
580            return;
581          }
582          cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
583        }
584      } else {
585        cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
586      }
587    }
588  }
589
590  protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) {
591    return BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, newBlock);
592  }
593
594  protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
595    boolean inMemory, boolean wait) {
596    if (!isCacheEnabled()) {
597      return;
598    }
599    if (cacheKey.getBlockType() == null && cachedItem.getBlockType() != null) {
600      cacheKey.setBlockType(cachedItem.getBlockType());
601    }
602    LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
603    // Stuff the entry into the RAM cache so it can get drained to the persistent store
604    RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(),
605      inMemory, isCachePersistent() && ioEngine instanceof FileIOEngine, wait);
606    /**
607     * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same
608     * key in ramCache, the heap size of bucket cache need to update if replacing entry from
609     * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if
610     * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct
611     * one, then the heap size will mess up (HBASE-20789)
612     */
613    if (ramCache.putIfAbsent(cacheKey, re) != null) {
614      return;
615    }
616    int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
617    BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum);
618    boolean successfulAddition = false;
619    if (wait) {
620      try {
621        successfulAddition = bq.offer(re, queueAdditionWaitTime, TimeUnit.MILLISECONDS);
622      } catch (InterruptedException e) {
623        LOG.error("Thread interrupted: ", e);
624        Thread.currentThread().interrupt();
625      }
626    } else {
627      successfulAddition = bq.offer(re);
628    }
629    if (!successfulAddition) {
630      LOG.debug("Failed to insert block {} into the cache writers queue", cacheKey);
631      ramCache.remove(cacheKey);
632      cacheStats.failInsert();
633    } else {
634      this.blockNumber.increment();
635      this.heapSize.add(cachedItem.heapSize());
636    }
637  }
638
639  /**
640   * If the passed cache key relates to a reference (&lt;hfile&gt;.&lt;parentEncRegion&gt;), this
641   * method looks for the block from the referred file, in the cache. If present in the cache, the
642   * block for the referred file is returned, otherwise, this method returns null. It will also
643   * return null if the passed cache key doesn't relate to a reference.
644   * @param key the BlockCacheKey instance to look for in the cache.
645   * @return the cached block from the referred file, null if there's no such block in the cache or
646   *         the passed key doesn't relate to a reference.
647   */
648  public BucketEntry getBlockForReference(BlockCacheKey key) {
649    BucketEntry foundEntry = null;
650    String referredFileName = null;
651    if (StoreFileInfo.isReference(key.getHfileName())) {
652      referredFileName = StoreFileInfo.getReferredToRegionAndFile(key.getHfileName()).getSecond();
653    }
654    if (referredFileName != null) {
655      BlockCacheKey convertedCacheKey = new BlockCacheKey(referredFileName, key.getOffset());
656      foundEntry = backingMap.get(convertedCacheKey);
657      LOG.debug("Got a link/ref: {}. Related cacheKey: {}. Found entry: {}", key.getHfileName(),
658        convertedCacheKey, foundEntry);
659    }
660    return foundEntry;
661  }
662
663  /**
664   * Get the buffer of the block with the specified key.
665   * @param key                block's cache key
666   * @param caching            true if the caller caches blocks on cache misses
667   * @param repeat             Whether this is a repeat lookup for the same block
668   * @param updateCacheMetrics Whether we should update cache metrics or not
669   * @return buffer of specified cache key, or null if not in cache
670   */
671  @Override
672  public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
673    boolean updateCacheMetrics) {
674    if (!isCacheEnabled()) {
675      cacheStats.miss(caching, key.isPrimary(), key.getBlockType());
676      return null;
677    }
678    RAMQueueEntry re = ramCache.get(key);
679    if (re != null) {
680      if (updateCacheMetrics) {
681        cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
682      }
683      re.access(accessCount.incrementAndGet());
684      return re.getData();
685    }
686    BucketEntry bucketEntry = backingMap.get(key);
687    if (bucketEntry == null) {
688      bucketEntry = getBlockForReference(key);
689    }
690    if (bucketEntry != null) {
691      long start = System.nanoTime();
692      ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
693      try {
694        lock.readLock().lock();
695        // We can not read here even if backingMap does contain the given key because its offset
696        // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check
697        // existence here.
698        if (
699          bucketEntry.equals(backingMap.get(key)) || bucketEntry.equals(getBlockForReference(key))
700        ) {
701          // Read the block from IOEngine based on the bucketEntry's offset and length, NOTICE: the
702          // block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to
703          // the same BucketEntry, then all of the three will share the same refCnt.
704          Cacheable cachedBlock = ioEngine.read(bucketEntry);
705          if (ioEngine.usesSharedMemory()) {
706            // If IOEngine use shared memory, cachedBlock and BucketEntry will share the
707            // same RefCnt, do retain here, in order to count the number of RPC references
708            cachedBlock.retain();
709          }
710          // Update the cache statistics.
711          if (updateCacheMetrics) {
712            cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
713            cacheStats.ioHit(System.nanoTime() - start);
714          }
715          bucketEntry.access(accessCount.incrementAndGet());
716          if (this.ioErrorStartTime > 0) {
717            ioErrorStartTime = -1;
718          }
719          return cachedBlock;
720        }
721      } catch (HBaseIOException hioex) {
722        // When using file io engine persistent cache,
723        // the cache map state might differ from the actual cache. If we reach this block,
724        // we should remove the cache key entry from the backing map
725        backingMap.remove(key);
726        fileNotFullyCached(key, bucketEntry);
727        LOG.debug("Failed to fetch block for cache key: {}.", key, hioex);
728      } catch (IOException ioex) {
729        LOG.error("Failed reading block " + key + " from bucket cache", ioex);
730        checkIOErrorIsTolerated();
731      } finally {
732        lock.readLock().unlock();
733      }
734    }
735    if (!repeat && updateCacheMetrics) {
736      cacheStats.miss(caching, key.isPrimary(), key.getBlockType());
737    }
738    return null;
739  }
740
741  /**
742   * This method is invoked after the bucketEntry is removed from {@link BucketCache#backingMap}
743   */
744  void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber,
745    boolean evictedByEvictionProcess) {
746    bucketEntry.markAsEvicted();
747    blocksByHFile.remove(cacheKey);
748    if (decrementBlockNumber) {
749      this.blockNumber.decrement();
750      if (ioEngine.isPersistent()) {
751        fileNotFullyCached(cacheKey, bucketEntry);
752      }
753    }
754    if (evictedByEvictionProcess) {
755      cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
756    }
757    if (ioEngine.isPersistent()) {
758      setCacheInconsistent(true);
759    }
760  }
761
762  private void fileNotFullyCached(BlockCacheKey key, BucketEntry entry) {
763    // Update the updateRegionCachedSize before removing the file from fullyCachedFiles.
764    // This computation should happen even if the file is not in fullyCachedFiles map.
765    updateRegionCachedSize(key.getFilePath(), (entry.getLength() * -1));
766    fullyCachedFiles.remove(key.getHfileName());
767  }
768
769  public void fileCacheCompleted(Path filePath, long size) {
770    Pair<String, Long> pair = new Pair<>();
771    // sets the region name
772    String regionName = filePath.getParent().getParent().getName();
773    pair.setFirst(regionName);
774    pair.setSecond(size);
775    fullyCachedFiles.put(filePath.getName(), pair);
776  }
777
778  private void updateRegionCachedSize(Path filePath, long cachedSize) {
779    if (filePath != null) {
780      if (HFileArchiveUtil.isHFileArchived(filePath)) {
781        LOG.trace("Skipping region cached size update for archived file: {}", filePath);
782      } else {
783        String regionName = filePath.getParent().getParent().getName();
784        regionCachedSize.merge(regionName, cachedSize,
785          (previousSize, newBlockSize) -> previousSize + newBlockSize);
786        LOG.trace("Updating region cached size for region: {}", regionName);
787        // If all the blocks for a region are evicted from the cache,
788        // remove the entry for that region from regionCachedSize map.
789        if (regionCachedSize.get(regionName) <= 0) {
790          regionCachedSize.remove(regionName);
791        }
792      }
793    }
794  }
795
796  /**
797   * Free the {{@link BucketEntry} actually,which could only be invoked when the
798   * {@link BucketEntry#refCnt} becoming 0.
799   */
800  void freeBucketEntry(BucketEntry bucketEntry) {
801    bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength());
802    realCacheSize.add(-1 * bucketEntry.getLength());
803  }
804
805  /**
806   * Try to evict the block from {@link BlockCache} by force. We'll call this in few cases:<br>
807   * 1. Close an HFile, and clear all cached blocks. <br>
808   * 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for a given table.<br>
809   * <p>
810   * Firstly, we'll try to remove the block from RAMCache,and then try to evict from backingMap.
811   * Here we evict the block from backingMap immediately, but only free the reference from bucket
812   * cache by calling {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this
813   * block, block can only be de-allocated when all of them release the block.
814   * <p>
815   * NOTICE: we need to grab the write offset lock firstly before releasing the reference from
816   * bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt = 0 when
817   * {@link BucketCache#getBlock(BlockCacheKey, boolean, boolean, boolean)}, it's a memory leak.
818   * @param cacheKey Block to evict
819   * @return true to indicate whether we've evicted successfully or not.
820   */
821  @Override
822  public boolean evictBlock(BlockCacheKey cacheKey) {
823    return doEvictBlock(cacheKey, null, false);
824  }
825
826  /**
827   * Evict the {@link BlockCacheKey} and {@link BucketEntry} from {@link BucketCache#backingMap} and
828   * {@link BucketCache#ramCache}. <br/>
829   * NOTE:When Evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and
830   * {@link BucketEntry} could be removed.
831   * @param cacheKey    {@link BlockCacheKey} to evict.
832   * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict.
833   * @return true to indicate whether we've evicted successfully or not.
834   */
835  private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry,
836    boolean evictedByEvictionProcess) {
837    if (!isCacheEnabled()) {
838      return false;
839    }
840    boolean existedInRamCache = removeFromRamCache(cacheKey);
841    if (bucketEntry == null) {
842      bucketEntry = backingMap.get(cacheKey);
843    }
844    final BucketEntry bucketEntryToUse = bucketEntry;
845
846    if (bucketEntryToUse == null) {
847      if (existedInRamCache && evictedByEvictionProcess) {
848        cacheStats.evicted(0, cacheKey.isPrimary());
849      }
850      return existedInRamCache;
851    } else {
852      return bucketEntryToUse.withWriteLock(offsetLock, () -> {
853        if (backingMap.remove(cacheKey, bucketEntryToUse)) {
854          LOG.debug("removed key {} from back map with offset lock {} in the evict process",
855            cacheKey, bucketEntryToUse.offset());
856          blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache, evictedByEvictionProcess);
857          return true;
858        }
859        return false;
860      });
861    }
862  }
863
864  /**
865   * <pre>
866   * Create the {@link Recycler} for {@link BucketEntry#refCnt},which would be used as
867   * {@link RefCnt#recycler} of {@link HFileBlock#buf} returned from {@link BucketCache#getBlock}.
868   * NOTE: for {@link BucketCache#getBlock},the {@link RefCnt#recycler} of {@link HFileBlock#buf}
869   * from {@link BucketCache#backingMap} and {@link BucketCache#ramCache} are different:
870   * 1.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#backingMap},
871   *   it is the return value of current {@link BucketCache#createRecycler} method.
872   *
873   * 2.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#ramCache},
874   *   it is {@link ByteBuffAllocator#putbackBuffer}.
875   * </pre>
876   */
877  public Recycler createRecycler(final BucketEntry bucketEntry) {
878    return () -> {
879      freeBucketEntry(bucketEntry);
880      return;
881    };
882  }
883
884  /**
885   * NOTE: This method is only for test.
886   */
887  public boolean evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey) {
888    BucketEntry bucketEntry = backingMap.get(blockCacheKey);
889    if (bucketEntry == null) {
890      return false;
891    }
892    return evictBucketEntryIfNoRpcReferenced(blockCacheKey, bucketEntry);
893  }
894
895  /**
896   * Evict {@link BlockCacheKey} and its corresponding {@link BucketEntry} only if
897   * {@link BucketEntry#isRpcRef} is false. <br/>
898   * NOTE:When evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and
899   * {@link BucketEntry} could be removed.
900   * @param blockCacheKey {@link BlockCacheKey} to evict.
901   * @param bucketEntry   {@link BucketEntry} matched {@link BlockCacheKey} to evict.
902   * @return true to indicate whether we've evicted successfully or not.
903   */
904  boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, BucketEntry bucketEntry) {
905    if (!bucketEntry.isRpcRef()) {
906      return doEvictBlock(blockCacheKey, bucketEntry, true);
907    }
908    return false;
909  }
910
911  /**
912   * Since HBASE-29249, the following properties governin freeSpace behaviour and block priorities
913   * were made dynamically configurable: - hbase.bucketcache.acceptfactor -
914   * hbase.bucketcache.minfactor - hbase.bucketcache.extrafreefactor -
915   * hbase.bucketcache.single.factor - hbase.bucketcache.multi.factor -
916   * hbase.bucketcache.multi.factor - hbase.bucketcache.memory.factor The
917   * hbase.bucketcache.queue.addition.waittime property allows for introducing a delay in the
918   * publishing of blocks for the cache writer threads during prefetch reads only (client reads
919   * wouldn't get delayed). It has also been made dynamic configurable since HBASE-29249. The
920   * hbase.bucketcache.persist.intervalinmillis propperty determines the frequency for saving the
921   * persistent cache, and it has also been made dynamically configurable since HBASE-29249. The
922   * hbase.bucketcache.persistence.chunksize property determines the size of the persistent file
923   * splits (due to the limitation of maximum allowed protobuff size), and it has also been made
924   * dynamically configurable since HBASE-29249.
925   * @param config the new configuration to be updated.
926   */
927  @Override
928  public void onConfigurationChange(Configuration config) {
929    this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR);
930    this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR);
931    this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR);
932    this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR);
933    this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR);
934    this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR);
935    this.queueAdditionWaitTime =
936      conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME);
937    this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000);
938    this.persistenceChunkSize =
939      conf.getLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE);
940    sanityCheckConfigs();
941  }
942
943  protected boolean removeFromRamCache(BlockCacheKey cacheKey) {
944    return ramCache.remove(cacheKey, re -> {
945      if (re != null) {
946        this.blockNumber.decrement();
947        this.heapSize.add(-1 * re.getData().heapSize());
948      }
949    });
950  }
951
952  public boolean isCacheInconsistent() {
953    return isCacheInconsistent.get();
954  }
955
956  public void setCacheInconsistent(boolean setCacheInconsistent) {
957    isCacheInconsistent.set(setCacheInconsistent);
958  }
959
960  protected void setCacheState(CacheState state) {
961    cacheState = state;
962  }
963
964  /*
965   * Statistics thread. Periodically output cache statistics to the log.
966   */
967  private static class StatisticsThread extends Thread {
968    private final BucketCache bucketCache;
969
970    public StatisticsThread(BucketCache bucketCache) {
971      super("BucketCacheStatsThread");
972      setDaemon(true);
973      this.bucketCache = bucketCache;
974    }
975
976    @Override
977    public void run() {
978      bucketCache.logStats();
979    }
980  }
981
982  public void logStats() {
983    if (!isCacheInitialized("BucketCache::logStats")) {
984      return;
985    }
986
987    long totalSize = bucketAllocator.getTotalSize();
988    long usedSize = bucketAllocator.getUsedSize();
989    long freeSize = totalSize - usedSize;
990    long cacheSize = getRealCacheSize();
991    LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " + "totalSize="
992      + StringUtils.byteDesc(totalSize) + ", " + "freeSize=" + StringUtils.byteDesc(freeSize) + ", "
993      + "usedSize=" + StringUtils.byteDesc(usedSize) + ", " + "cacheSize="
994      + StringUtils.byteDesc(cacheSize) + ", " + "accesses=" + cacheStats.getRequestCount() + ", "
995      + "hits=" + cacheStats.getHitCount() + ", " + "IOhitsPerSecond="
996      + cacheStats.getIOHitsPerSecond() + ", " + "IOTimePerHit="
997      + String.format("%.2f", cacheStats.getIOTimePerHit()) + ", " + "hitRatio="
998      + (cacheStats.getHitCount() == 0
999        ? "0,"
1000        : (StringUtils.formatPercent(cacheStats.getHitRatio(), 2) + ", "))
1001      + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits="
1002      + cacheStats.getHitCachingCount() + ", " + "cachingHitsRatio="
1003      + (cacheStats.getHitCachingCount() == 0
1004        ? "0,"
1005        : (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2) + ", "))
1006      + "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted="
1007      + cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + cacheStats.evictedPerEviction()
1008      + ", " + "allocationFailCount=" + cacheStats.getAllocationFailCount() + ", blocksCount="
1009      + backingMap.size());
1010    cacheStats.reset();
1011
1012    bucketAllocator.logDebugStatistics();
1013  }
1014
1015  public long getRealCacheSize() {
1016    return this.realCacheSize.sum();
1017  }
1018
1019  public long acceptableSize() {
1020    if (!isCacheInitialized("BucketCache::acceptableSize")) {
1021      return 0;
1022    }
1023    return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor);
1024  }
1025
1026  long getPartitionSize(float partitionFactor) {
1027    if (!isCacheInitialized("BucketCache::getPartitionSize")) {
1028      return 0;
1029    }
1030
1031    return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor);
1032  }
1033
1034  /**
1035   * Return the count of bucketSizeinfos still need free space
1036   */
1037  private int bucketSizesAboveThresholdCount(float minFactor) {
1038    if (!isCacheInitialized("BucketCache::bucketSizesAboveThresholdCount")) {
1039      return 0;
1040    }
1041
1042    BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
1043    int fullCount = 0;
1044    for (int i = 0; i < stats.length; i++) {
1045      long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor));
1046      freeGoal = Math.max(freeGoal, 1);
1047      if (stats[i].freeCount() < freeGoal) {
1048        fullCount++;
1049      }
1050    }
1051    return fullCount;
1052  }
1053
1054  /**
1055   * This method will find the buckets that are minimally occupied and are not reference counted and
1056   * will free them completely without any constraint on the access times of the elements, and as a
1057   * process will completely free at most the number of buckets passed, sometimes it might not due
1058   * to changing refCounts
1059   * @param completelyFreeBucketsNeeded number of buckets to free
1060   **/
1061  private void freeEntireBuckets(int completelyFreeBucketsNeeded) {
1062    if (!isCacheInitialized("BucketCache::freeEntireBuckets")) {
1063      return;
1064    }
1065
1066    if (completelyFreeBucketsNeeded != 0) {
1067      // First we will build a set where the offsets are reference counted, usually
1068      // this set is small around O(Handler Count) unless something else is wrong
1069      Set<Integer> inUseBuckets = new HashSet<>();
1070      backingMap.forEach((k, be) -> {
1071        if (be.isRpcRef()) {
1072          inUseBuckets.add(bucketAllocator.getBucketIndex(be.offset()));
1073        }
1074      });
1075      Set<Integer> candidateBuckets =
1076        bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded);
1077      for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) {
1078        if (candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset()))) {
1079          evictBucketEntryIfNoRpcReferenced(entry.getKey(), entry.getValue());
1080        }
1081      }
1082    }
1083  }
1084
1085  private long calculateBytesToFree(StringBuilder msgBuffer) {
1086    long bytesToFreeWithoutExtra = 0;
1087    BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
1088    long[] bytesToFreeForBucket = new long[stats.length];
1089    for (int i = 0; i < stats.length; i++) {
1090      bytesToFreeForBucket[i] = 0;
1091      long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor));
1092      freeGoal = Math.max(freeGoal, 1);
1093      if (stats[i].freeCount() < freeGoal) {
1094        bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount());
1095        bytesToFreeWithoutExtra += bytesToFreeForBucket[i];
1096        if (msgBuffer != null) {
1097          msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")="
1098            + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", ");
1099        }
1100      }
1101    }
1102    if (msgBuffer != null) {
1103      msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", ");
1104    }
1105    return bytesToFreeWithoutExtra;
1106  }
1107
1108  /**
1109   * Free the space if the used size reaches acceptableSize() or one size block couldn't be
1110   * allocated. When freeing the space, we use the LRU algorithm and ensure there must be some
1111   * blocks evicted
1112   * @param why Why we are being called
1113   */
1114  void freeSpace(final String why) {
1115    if (!isCacheInitialized("BucketCache::freeSpace")) {
1116      return;
1117    }
1118    // Ensure only one freeSpace progress at a time
1119    if (!freeSpaceLock.tryLock()) {
1120      return;
1121    }
1122    try {
1123      freeInProgress = true;
1124      StringBuilder msgBuffer = LOG.isDebugEnabled() ? new StringBuilder() : null;
1125      long bytesToFreeWithoutExtra = calculateBytesToFree(msgBuffer);
1126      if (bytesToFreeWithoutExtra <= 0) {
1127        return;
1128      }
1129      long currentSize = bucketAllocator.getUsedSize();
1130      long totalSize = bucketAllocator.getTotalSize();
1131      if (LOG.isDebugEnabled() && msgBuffer != null) {
1132        LOG.debug("Free started because \"" + why + "\"; " + msgBuffer + " of current used="
1133          + StringUtils.byteDesc(currentSize) + ", actual cacheSize="
1134          + StringUtils.byteDesc(realCacheSize.sum()) + ", total="
1135          + StringUtils.byteDesc(totalSize));
1136      }
1137      long bytesToFreeWithExtra =
1138        (long) Math.floor(bytesToFreeWithoutExtra * (1 + extraFreeFactor));
1139      // Instantiate priority buckets
1140      BucketEntryGroup bucketSingle =
1141        new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(singleFactor));
1142      BucketEntryGroup bucketMulti =
1143        new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(multiFactor));
1144      BucketEntryGroup bucketMemory =
1145        new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(memoryFactor));
1146
1147      Set<String> allValidFiles = null;
1148      // We need the region/stores/files tree, in order to figure out if a block is "orphan" or not.
1149      // See further comments below for more details.
1150      if (onlineRegions != null) {
1151        allValidFiles = BlockCacheUtil.listAllFilesNames(onlineRegions);
1152      }
1153      // the cached time is recored in nanos, so we need to convert the grace period accordingly
1154      long orphanGracePeriodNanos = orphanBlockGracePeriod * 1000000;
1155      long bytesFreed = 0;
1156      // Check the list of files to determine the cold files which can be readily evicted.
1157      Map<String, String> coldFiles = null;
1158
1159      DataTieringManager dataTieringManager = DataTieringManager.getInstance();
1160      if (dataTieringManager != null) {
1161        coldFiles = dataTieringManager.getColdFilesList();
1162      }
1163      // Scan entire map putting bucket entry into appropriate bucket entry
1164      // group
1165      for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) {
1166        BlockCacheKey key = bucketEntryWithKey.getKey();
1167        BucketEntry entry = bucketEntryWithKey.getValue();
1168        // Under certain conditions, blocks for regions not on the current region server might
1169        // be hanging on the cache. For example, when using the persistent cache feature, if the
1170        // RS crashes, then if not the same regions are assigned back once its online again, blocks
1171        // for the previous online regions would be recovered and stay in the cache. These would be
1172        // "orphan" blocks, as the files these blocks belong to are not in any of the online
1173        // regions.
1174        // "Orphan" blocks are a pure waste of cache space and should be evicted first during
1175        // the freespace run.
1176        // Compactions and Flushes may cache blocks before its files are completely written. In
1177        // these cases the file won't be found in any of the online regions stores, but the block
1178        // shouldn't be evicted. To avoid this, we defined this
1179        // hbase.bucketcache.block.orphan.evictgraceperiod property, to account for a grace
1180        // period (default 24 hours) where a block should be checked if it's an orphan block.
1181        if (
1182          allValidFiles != null
1183            && entry.getCachedTime() < (System.nanoTime() - orphanGracePeriodNanos)
1184        ) {
1185          if (!allValidFiles.contains(key.getHfileName())) {
1186            if (evictBucketEntryIfNoRpcReferenced(key, entry)) {
1187              // We calculate the freed bytes, but we don't stop if the goal was reached because
1188              // these are orphan blocks anyway, so let's leverage this run of freeSpace
1189              // to get rid of all orphans at once.
1190              bytesFreed += entry.getLength();
1191              continue;
1192            }
1193          }
1194        }
1195
1196        if (
1197          bytesFreed < bytesToFreeWithExtra && coldFiles != null
1198            && coldFiles.containsKey(bucketEntryWithKey.getKey().getHfileName())
1199        ) {
1200          int freedBlockSize = bucketEntryWithKey.getValue().getLength();
1201          if (evictBlockIfNoRpcReferenced(bucketEntryWithKey.getKey())) {
1202            bytesFreed += freedBlockSize;
1203          }
1204          continue;
1205        }
1206
1207        switch (entry.getPriority()) {
1208          case SINGLE: {
1209            bucketSingle.add(bucketEntryWithKey);
1210            break;
1211          }
1212          case MULTI: {
1213            bucketMulti.add(bucketEntryWithKey);
1214            break;
1215          }
1216          case MEMORY: {
1217            bucketMemory.add(bucketEntryWithKey);
1218            break;
1219          }
1220        }
1221      }
1222
1223      // Check if the cold file eviction is sufficient to create enough space.
1224      bytesToFreeWithExtra -= bytesFreed;
1225      if (bytesToFreeWithExtra <= 0) {
1226        LOG.debug("Bucket cache free space completed; freed space : {} bytes of cold data blocks.",
1227          StringUtils.byteDesc(bytesFreed));
1228        return;
1229      }
1230
1231      if (LOG.isDebugEnabled()) {
1232        LOG.debug(
1233          "Bucket cache free space completed; freed space : {} "
1234            + "bytes of cold data blocks. {} more bytes required to be freed.",
1235          StringUtils.byteDesc(bytesFreed), bytesToFreeWithExtra);
1236      }
1237
1238      PriorityQueue<BucketEntryGroup> bucketQueue =
1239        new PriorityQueue<>(3, Comparator.comparingLong(BucketEntryGroup::overflow));
1240
1241      bucketQueue.add(bucketSingle);
1242      bucketQueue.add(bucketMulti);
1243      bucketQueue.add(bucketMemory);
1244
1245      int remainingBuckets = bucketQueue.size();
1246      BucketEntryGroup bucketGroup;
1247      while ((bucketGroup = bucketQueue.poll()) != null) {
1248        long overflow = bucketGroup.overflow();
1249        if (overflow > 0) {
1250          long bucketBytesToFree =
1251            Math.min(overflow, (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets);
1252          bytesFreed += bucketGroup.free(bucketBytesToFree);
1253        }
1254        remainingBuckets--;
1255      }
1256
1257      // Check and free if there are buckets that still need freeing of space
1258      if (bucketSizesAboveThresholdCount(minFactor) > 0) {
1259        bucketQueue.clear();
1260        remainingBuckets = 3;
1261        bucketQueue.add(bucketSingle);
1262        bucketQueue.add(bucketMulti);
1263        bucketQueue.add(bucketMemory);
1264        while ((bucketGroup = bucketQueue.poll()) != null) {
1265          long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets;
1266          bytesFreed += bucketGroup.free(bucketBytesToFree);
1267          remainingBuckets--;
1268        }
1269      }
1270      // Even after the above free we might still need freeing because of the
1271      // De-fragmentation of the buckets (also called Slab Calcification problem), i.e
1272      // there might be some buckets where the occupancy is very sparse and thus are not
1273      // yielding the free for the other bucket sizes, the fix for this to evict some
1274      // of the buckets, we do this by evicting the buckets that are least fulled
1275      freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR * bucketSizesAboveThresholdCount(1.0f));
1276
1277      if (LOG.isDebugEnabled()) {
1278        long single = bucketSingle.totalSize();
1279        long multi = bucketMulti.totalSize();
1280        long memory = bucketMemory.totalSize();
1281        if (LOG.isDebugEnabled()) {
1282          LOG.debug("Bucket cache free space completed; " + "freed="
1283            + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + StringUtils.byteDesc(totalSize)
1284            + ", " + "single=" + StringUtils.byteDesc(single) + ", " + "multi="
1285            + StringUtils.byteDesc(multi) + ", " + "memory=" + StringUtils.byteDesc(memory));
1286        }
1287      }
1288    } catch (Throwable t) {
1289      LOG.warn("Failed freeing space", t);
1290    } finally {
1291      cacheStats.evict();
1292      freeInProgress = false;
1293      freeSpaceLock.unlock();
1294    }
1295  }
1296
1297  // This handles flushing the RAM cache to IOEngine.
1298  class WriterThread extends Thread {
1299    private final BlockingQueue<RAMQueueEntry> inputQueue;
1300    private volatile boolean writerEnabled = true;
1301    private final ByteBuffer metaBuff = ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE);
1302
1303    WriterThread(BlockingQueue<RAMQueueEntry> queue) {
1304      super("BucketCacheWriterThread");
1305      this.inputQueue = queue;
1306    }
1307
1308    // Used for test
1309    void disableWriter() {
1310      this.writerEnabled = false;
1311    }
1312
1313    @Override
1314    public void run() {
1315      List<RAMQueueEntry> entries = new ArrayList<>();
1316      try {
1317        while (isCacheEnabled() && writerEnabled) {
1318          try {
1319            try {
1320              // Blocks
1321              entries = getRAMQueueEntries(inputQueue, entries);
1322            } catch (InterruptedException ie) {
1323              if (!isCacheEnabled() || !writerEnabled) {
1324                break;
1325              }
1326            }
1327            doDrain(entries, metaBuff);
1328          } catch (Exception ioe) {
1329            LOG.error("WriterThread encountered error", ioe);
1330          }
1331        }
1332      } catch (Throwable t) {
1333        LOG.warn("Failed doing drain", t);
1334      }
1335      LOG.info(this.getName() + " exiting, cacheEnabled=" + isCacheEnabled());
1336    }
1337  }
1338
1339  /**
1340   * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing
1341   * cache with a new block for the same cache key. there's a corner case: one thread cache a block
1342   * in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another new block
1343   * with the same cache key do the same thing for the same cache key, so if not evict the previous
1344   * bucket entry, then memory leak happen because the previous bucketEntry is gone but the
1345   * bucketAllocator do not free its memory.
1346   * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey
1347   *      cacheKey, Cacheable newBlock)
1348   * @param key         Block cache key
1349   * @param bucketEntry Bucket entry to put into backingMap.
1350   */
1351  protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
1352    BucketEntry previousEntry = backingMap.put(key, bucketEntry);
1353    blocksByHFile.add(key);
1354    updateRegionCachedSize(key.getFilePath(), bucketEntry.getLength());
1355    if (previousEntry != null && previousEntry != bucketEntry) {
1356      previousEntry.withWriteLock(offsetLock, () -> {
1357        blockEvicted(key, previousEntry, false, false);
1358        return null;
1359      });
1360    }
1361  }
1362
1363  /**
1364   * Prepare and return a warning message for Bucket Allocator Exception
1365   * @param fle The exception
1366   * @param re  The RAMQueueEntry for which the exception was thrown.
1367   * @return A warning message created from the input RAMQueueEntry object.
1368   */
1369  private static String getAllocationFailWarningMessage(final BucketAllocatorException fle,
1370    final RAMQueueEntry re) {
1371    final StringBuilder sb = new StringBuilder();
1372    sb.append("Most recent failed allocation after ");
1373    sb.append(ALLOCATION_FAIL_LOG_TIME_PERIOD);
1374    sb.append(" ms;");
1375    if (re != null) {
1376      if (re.getData() instanceof HFileBlock) {
1377        final HFileContext fileContext = ((HFileBlock) re.getData()).getHFileContext();
1378        final String columnFamily = Bytes.toString(fileContext.getColumnFamily());
1379        final String tableName = Bytes.toString(fileContext.getTableName());
1380        if (tableName != null) {
1381          sb.append(" Table: ");
1382          sb.append(tableName);
1383        }
1384        if (columnFamily != null) {
1385          sb.append(" CF: ");
1386          sb.append(columnFamily);
1387        }
1388        sb.append(" HFile: ");
1389        if (fileContext.getHFileName() != null) {
1390          sb.append(fileContext.getHFileName());
1391        } else {
1392          sb.append(re.getKey());
1393        }
1394      } else {
1395        sb.append(" HFile: ");
1396        sb.append(re.getKey());
1397      }
1398    }
1399    sb.append(" Message: ");
1400    sb.append(fle.getMessage());
1401    return sb.toString();
1402  }
1403
1404  /**
1405   * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. Process all that
1406   * are passed in even if failure being sure to remove from ramCache else we'll never undo the
1407   * references and we'll OOME.
1408   * @param entries Presumes list passed in here will be processed by this invocation only. No
1409   *                interference expected.
1410   */
1411  void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException {
1412    if (entries.isEmpty()) {
1413      return;
1414    }
1415    // This method is a little hard to follow. We run through the passed in entries and for each
1416    // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must
1417    // do cleanup making sure we've cleared ramCache of all entries regardless of whether we
1418    // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by
1419    // filling ramCache. We do the clean up by again running through the passed in entries
1420    // doing extra work when we find a non-null bucketEntries corresponding entry.
1421    final int size = entries.size();
1422    BucketEntry[] bucketEntries = new BucketEntry[size];
1423    // Index updated inside loop if success or if we can't succeed. We retry if cache is full
1424    // when we go to add an entry by going around the loop again without upping the index.
1425    int index = 0;
1426    while (isCacheEnabled() && index < size) {
1427      RAMQueueEntry re = null;
1428      try {
1429        re = entries.get(index);
1430        if (re == null) {
1431          LOG.warn("Couldn't get entry or changed on us; who else is messing with it?");
1432          index++;
1433          continue;
1434        }
1435        // Reset the position for reuse.
1436        // It should be guaranteed that the data in the metaBuff has been transferred to the
1437        // ioEngine safely. Otherwise, this reuse is problematic. Fortunately, the data is already
1438        // transferred with our current IOEngines. Should take care, when we have new kinds of
1439        // IOEngine in the future.
1440        metaBuff.clear();
1441        BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize,
1442          this::createRecycler, metaBuff, acceptableSize());
1443        // Successfully added. Up index and add bucketEntry. Clear io exceptions.
1444        bucketEntries[index] = bucketEntry;
1445        if (ioErrorStartTime > 0) {
1446          ioErrorStartTime = -1;
1447        }
1448        index++;
1449      } catch (BucketAllocatorException fle) {
1450        long currTs = EnvironmentEdgeManager.currentTime();
1451        cacheStats.allocationFailed(); // Record the warning.
1452        if (
1453          allocFailLogPrevTs == 0 || (currTs - allocFailLogPrevTs) > ALLOCATION_FAIL_LOG_TIME_PERIOD
1454        ) {
1455          LOG.warn(getAllocationFailWarningMessage(fle, re));
1456          allocFailLogPrevTs = currTs;
1457        }
1458        // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below.
1459        bucketEntries[index] = null;
1460        index++;
1461      } catch (CacheFullException cfe) {
1462        // Cache full when we tried to add. Try freeing space and then retrying (don't up index)
1463        if (!freeInProgress && !re.isPrefetch()) {
1464          freeSpace("Full!");
1465        } else if (re.isPrefetch()) {
1466          bucketEntries[index] = null;
1467          index++;
1468        } else {
1469          Thread.sleep(50);
1470        }
1471      } catch (IOException ioex) {
1472        // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem.
1473        LOG.error("Failed writing to bucket cache", ioex);
1474        checkIOErrorIsTolerated();
1475      }
1476    }
1477
1478    // Make sure data pages are written on media before we update maps.
1479    try {
1480      ioEngine.sync();
1481    } catch (IOException ioex) {
1482      LOG.error("Failed syncing IO engine", ioex);
1483      checkIOErrorIsTolerated();
1484      // Since we failed sync, free the blocks in bucket allocator
1485      for (int i = 0; i < entries.size(); ++i) {
1486        BucketEntry bucketEntry = bucketEntries[i];
1487        if (bucketEntry != null) {
1488          bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength());
1489          bucketEntries[i] = null;
1490        }
1491      }
1492    }
1493
1494    // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if
1495    // success or error.
1496    for (int i = 0; i < size; ++i) {
1497      BlockCacheKey key = entries.get(i).getKey();
1498      // Only add if non-null entry.
1499      if (bucketEntries[i] != null) {
1500        putIntoBackingMap(key, bucketEntries[i]);
1501        if (ioEngine.isPersistent()) {
1502          setCacheInconsistent(true);
1503        }
1504      }
1505      // Always remove from ramCache even if we failed adding it to the block cache above.
1506      boolean existed = ramCache.remove(key, re -> {
1507        if (re != null) {
1508          heapSize.add(-1 * re.getData().heapSize());
1509        }
1510      });
1511      if (!existed && bucketEntries[i] != null) {
1512        // Block should have already been evicted. Remove it and free space.
1513        final BucketEntry bucketEntry = bucketEntries[i];
1514        bucketEntry.withWriteLock(offsetLock, () -> {
1515          if (backingMap.remove(key, bucketEntry)) {
1516            blockEvicted(key, bucketEntry, false, false);
1517          }
1518          return null;
1519        });
1520      }
1521      long used = bucketAllocator.getUsedSize();
1522      if (!entries.get(i).isPrefetch() && used > acceptableSize()) {
1523        LOG.debug("Calling freeSpace for block: {}", entries.get(i).getKey());
1524        freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
1525      }
1526    }
1527
1528  }
1529
1530  /**
1531   * Blocks until elements available in {@code q} then tries to grab as many as possible before
1532   * returning.
1533   * @param receptacle Where to stash the elements taken from queue. We clear before we use it just
1534   *                   in case.
1535   * @param q          The queue to take from.
1536   * @return {@code receptacle} laden with elements taken from the queue or empty if none found.
1537   */
1538  static List<RAMQueueEntry> getRAMQueueEntries(BlockingQueue<RAMQueueEntry> q,
1539    List<RAMQueueEntry> receptacle) throws InterruptedException {
1540    // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it
1541    // ok even if list grew to accommodate thousands.
1542    receptacle.clear();
1543    receptacle.add(q.take());
1544    q.drainTo(receptacle);
1545    return receptacle;
1546  }
1547
1548  /**
1549   * @see #retrieveFromFile(int[])
1550   */
1551  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION",
1552      justification = "false positive, try-with-resources ensures close is called.")
1553  void persistToFile() throws IOException {
1554    LOG.debug("Thread {} started persisting bucket cache to file",
1555      Thread.currentThread().getName());
1556    if (!isCachePersistent()) {
1557      throw new IOException("Attempt to persist non-persistent cache mappings!");
1558    }
1559    File tempPersistencePath = new File(persistencePath + EnvironmentEdgeManager.currentTime());
1560    try (FileOutputStream fos = new FileOutputStream(tempPersistencePath, false)) {
1561      LOG.debug("Persist in new chunked persistence format.");
1562
1563      persistChunkedBackingMap(fos);
1564
1565      LOG.debug(
1566        "PersistToFile: after persisting backing map size: {}, fullycachedFiles size: {},"
1567          + " file name: {}",
1568        backingMap.size(), fullyCachedFiles.size(), tempPersistencePath.getName());
1569    } catch (IOException e) {
1570      LOG.error("Failed to persist bucket cache to file", e);
1571      throw e;
1572    } catch (Throwable e) {
1573      LOG.error("Failed during persist bucket cache to file: ", e);
1574      throw e;
1575    }
1576    LOG.debug("Thread {} finished persisting bucket cache to file, renaming",
1577      Thread.currentThread().getName());
1578    if (!tempPersistencePath.renameTo(new File(persistencePath))) {
1579      LOG.warn("Failed to commit cache persistent file. We might lose cached blocks if "
1580        + "RS crashes/restarts before we successfully checkpoint again.");
1581    }
1582  }
1583
1584  public boolean isCachePersistent() {
1585    return ioEngine.isPersistent() && persistencePath != null;
1586  }
1587
1588  @Override
1589  public Optional<Map<String, Long>> getRegionCachedInfo() {
1590    return Optional.of(Collections.unmodifiableMap(regionCachedSize));
1591  }
1592
1593  /**
1594   * @see #persistToFile()
1595   */
1596  private void retrieveFromFile(int[] bucketSizes) throws IOException {
1597    LOG.info("Started retrieving bucket cache from file");
1598    File persistenceFile = new File(persistencePath);
1599    if (!persistenceFile.exists()) {
1600      LOG.warn("Persistence file missing! "
1601        + "It's ok if it's first run after enabling persistent cache.");
1602      bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize);
1603      blockNumber.add(backingMap.size());
1604      backingMapValidated.set(true);
1605      return;
1606    }
1607    assert !isCacheEnabled();
1608
1609    try (FileInputStream in = new FileInputStream(persistenceFile)) {
1610      int pblen = ProtobufMagic.lengthOfPBMagic();
1611      byte[] pbuf = new byte[pblen];
1612      IOUtils.readFully(in, pbuf, 0, pblen);
1613      if (ProtobufMagic.isPBMagicPrefix(pbuf)) {
1614        LOG.info("Reading old format of persistence.");
1615        // The old non-chunked version of backing map persistence.
1616        parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in));
1617      } else if (Arrays.equals(pbuf, BucketProtoUtils.PB_MAGIC_V2)) {
1618        // The new persistence format of chunked persistence.
1619        LOG.info("Reading new chunked format of persistence.");
1620        retrieveChunkedBackingMap(in);
1621      } else {
1622        // In 3.0 we have enough flexibility to dump the old cache data.
1623        // TODO: In 2.x line, this might need to be filled in to support reading the old format
1624        throw new IOException(
1625          "Persistence file does not start with protobuf magic number. " + persistencePath);
1626      }
1627      bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize);
1628      blockNumber.add(backingMap.size());
1629      LOG.info("Bucket cache retrieved from file successfully with size: {}", backingMap.size());
1630    }
1631  }
1632
1633  private void updateRegionSizeMapWhileRetrievingFromFile() {
1634    // Update the regionCachedSize with the region size while restarting the region server
1635    if (LOG.isDebugEnabled()) {
1636      LOG.debug("Updating region size map after retrieving cached file list");
1637      dumpPrefetchList();
1638    }
1639    regionCachedSize.clear();
1640    fullyCachedFiles.forEach((hFileName, hFileSize) -> {
1641      // Get the region name for each file
1642      String regionEncodedName = hFileSize.getFirst();
1643      long cachedFileSize = hFileSize.getSecond();
1644      regionCachedSize.merge(regionEncodedName, cachedFileSize,
1645        (oldpf, fileSize) -> oldpf + fileSize);
1646    });
1647  }
1648
1649  private void dumpPrefetchList() {
1650    for (Map.Entry<String, Pair<String, Long>> outerEntry : fullyCachedFiles.entrySet()) {
1651      LOG.debug("Cached File Entry:<{},<{},{}>>", outerEntry.getKey(),
1652        outerEntry.getValue().getFirst(), outerEntry.getValue().getSecond());
1653    }
1654  }
1655
1656  private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass)
1657    throws IOException {
1658    if (capacitySize != cacheCapacity) {
1659      throw new IOException("Mismatched cache capacity:" + StringUtils.byteDesc(capacitySize)
1660        + ", expected: " + StringUtils.byteDesc(cacheCapacity));
1661    }
1662    if (!ioEngine.getClass().getName().equals(ioclass)) {
1663      throw new IOException("Class name for IO engine mismatch: " + ioclass + ", expected:"
1664        + ioEngine.getClass().getName());
1665    }
1666    if (!backingMap.getClass().getName().equals(mapclass)) {
1667      throw new IOException("Class name for cache map mismatch: " + mapclass + ", expected:"
1668        + backingMap.getClass().getName());
1669    }
1670  }
1671
1672  private void verifyFileIntegrity(BucketCacheProtos.BucketCacheEntry proto) {
1673    try {
1674      if (proto.hasChecksum()) {
1675        ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(),
1676          algorithm);
1677      }
1678      backingMapValidated.set(true);
1679    } catch (IOException e) {
1680      LOG.warn("Checksum for cache file failed. "
1681        + "We need to validate each cache key in the backing map. "
1682        + "This may take some time, so we'll do it in a background thread,");
1683
1684      Runnable cacheValidator = () -> {
1685        while (bucketAllocator == null) {
1686          try {
1687            Thread.sleep(50);
1688          } catch (InterruptedException ex) {
1689            throw new RuntimeException(ex);
1690          }
1691        }
1692        long startTime = EnvironmentEdgeManager.currentTime();
1693        int totalKeysOriginally = backingMap.size();
1694        for (Map.Entry<BlockCacheKey, BucketEntry> keyEntry : backingMap.entrySet()) {
1695          try {
1696            ((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue());
1697          } catch (IOException e1) {
1698            LOG.debug("Check for key {} failed. Evicting.", keyEntry.getKey());
1699            evictBlock(keyEntry.getKey());
1700            fileNotFullyCached(keyEntry.getKey(), keyEntry.getValue());
1701          }
1702        }
1703        backingMapValidated.set(true);
1704        LOG.info("Finished validating {} keys in the backing map. Recovered: {}. This took {}ms.",
1705          totalKeysOriginally, backingMap.size(),
1706          (EnvironmentEdgeManager.currentTime() - startTime));
1707      };
1708      Thread t = new Thread(cacheValidator);
1709      t.setDaemon(true);
1710      t.start();
1711    }
1712  }
1713
1714  private void updateCacheIndex(BucketCacheProtos.BackingMap chunk,
1715    java.util.Map<java.lang.Integer, java.lang.String> deserializer) throws IOException {
1716    Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair2 =
1717      BucketProtoUtils.fromPB(deserializer, chunk, this::createRecycler);
1718    backingMap.putAll(pair2.getFirst());
1719    blocksByHFile.addAll(pair2.getSecond());
1720  }
1721
1722  private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException {
1723    Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair =
1724      BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(),
1725        this::createRecycler);
1726    backingMap = pair.getFirst();
1727    blocksByHFile = pair.getSecond();
1728    fullyCachedFiles.clear();
1729    fullyCachedFiles.putAll(BucketProtoUtils.fromPB(proto.getCachedFilesMap()));
1730
1731    LOG.info("After retrieval Backing map size: {}, fullyCachedFiles size: {}", backingMap.size(),
1732      fullyCachedFiles.size());
1733
1734    verifyFileIntegrity(proto);
1735    updateRegionSizeMapWhileRetrievingFromFile();
1736    verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass());
1737  }
1738
1739  private void persistChunkedBackingMap(FileOutputStream fos) throws IOException {
1740    LOG.debug(
1741      "persistToFile: before persisting backing map size: {}, "
1742        + "fullycachedFiles size: {}, chunkSize: {}",
1743      backingMap.size(), fullyCachedFiles.size(), persistenceChunkSize);
1744
1745    BucketProtoUtils.serializeAsPB(this, fos, persistenceChunkSize);
1746
1747    LOG.debug(
1748      "persistToFile: after persisting backing map size: {}, " + "fullycachedFiles size: {}",
1749      backingMap.size(), fullyCachedFiles.size());
1750  }
1751
1752  private void retrieveChunkedBackingMap(FileInputStream in) throws IOException {
1753
1754    // Read the first chunk that has all the details.
1755    BucketCacheProtos.BucketCacheEntry cacheEntry =
1756      BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in);
1757
1758    fullyCachedFiles.clear();
1759    fullyCachedFiles.putAll(BucketProtoUtils.fromPB(cacheEntry.getCachedFilesMap()));
1760
1761    backingMap.clear();
1762    blocksByHFile.clear();
1763
1764    // Read the backing map entries in batches.
1765    int numChunks = 0;
1766    while (in.available() > 0) {
1767      updateCacheIndex(BucketCacheProtos.BackingMap.parseDelimitedFrom(in),
1768        cacheEntry.getDeserializersMap());
1769      numChunks++;
1770    }
1771
1772    LOG.info("Retrieved {} of chunks with blockCount = {}.", numChunks, backingMap.size());
1773    verifyFileIntegrity(cacheEntry);
1774    verifyCapacityAndClasses(cacheEntry.getCacheCapacity(), cacheEntry.getIoClass(),
1775      cacheEntry.getMapClass());
1776    updateRegionSizeMapWhileRetrievingFromFile();
1777  }
1778
1779  /**
1780   * Check whether we tolerate IO error this time. If the duration of IOEngine throwing errors
1781   * exceeds ioErrorsDurationTimeTolerated, we will disable the cache
1782   */
1783  private void checkIOErrorIsTolerated() {
1784    long now = EnvironmentEdgeManager.currentTime();
1785    // Do a single read to a local variable to avoid timing issue - HBASE-24454
1786    long ioErrorStartTimeTmp = this.ioErrorStartTime;
1787    if (ioErrorStartTimeTmp > 0) {
1788      if (isCacheEnabled() && (now - ioErrorStartTimeTmp) > this.ioErrorsTolerationDuration) {
1789        LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration
1790          + "ms, disabling cache, please check your IOEngine");
1791        disableCache();
1792      }
1793    } else {
1794      this.ioErrorStartTime = now;
1795    }
1796  }
1797
1798  /**
1799   * Used to shut down the cache -or- turn it off in the case of something broken.
1800   */
1801  private void disableCache() {
1802    if (!isCacheEnabled()) {
1803      return;
1804    }
1805    LOG.info("Disabling cache");
1806    cacheState = CacheState.DISABLED;
1807    ioEngine.shutdown();
1808    this.scheduleThreadPool.shutdown();
1809    for (int i = 0; i < writerThreads.length; ++i)
1810      writerThreads[i].interrupt();
1811    this.ramCache.clear();
1812    if (!ioEngine.isPersistent() || persistencePath == null) {
1813      // If persistent ioengine and a path, we will serialize out the backingMap.
1814      this.backingMap.clear();
1815      this.blocksByHFile.clear();
1816      this.fullyCachedFiles.clear();
1817      this.regionCachedSize.clear();
1818    }
1819    if (cacheStats.getMetricsRollerScheduler() != null) {
1820      cacheStats.getMetricsRollerScheduler().shutdownNow();
1821    }
1822  }
1823
1824  private void join() throws InterruptedException {
1825    for (int i = 0; i < writerThreads.length; ++i)
1826      writerThreads[i].join();
1827  }
1828
1829  @Override
1830  public void shutdown() {
1831    disableCache();
1832    LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() + "; path to write="
1833      + persistencePath);
1834    if (ioEngine.isPersistent() && persistencePath != null) {
1835      try {
1836        join();
1837        if (cachePersister != null) {
1838          LOG.info("Shutting down cache persister thread.");
1839          cachePersister.shutdown();
1840          while (cachePersister.isAlive()) {
1841            Thread.sleep(10);
1842          }
1843        }
1844        persistToFile();
1845      } catch (IOException ex) {
1846        LOG.error("Unable to persist data on exit: " + ex.toString(), ex);
1847      } catch (InterruptedException e) {
1848        LOG.warn("Failed to persist data on exit", e);
1849      }
1850    }
1851  }
1852
1853  /**
1854   * Needed mostly for UTs that might run in the same VM and create different BucketCache instances
1855   * on different UT methods.
1856   */
1857  @Override
1858  protected void finalize() {
1859    if (cachePersister != null && !cachePersister.isInterrupted()) {
1860      cachePersister.interrupt();
1861    }
1862  }
1863
1864  @Override
1865  public CacheStats getStats() {
1866    return cacheStats;
1867  }
1868
1869  public BucketAllocator getAllocator() {
1870    return this.bucketAllocator;
1871  }
1872
1873  @Override
1874  public long heapSize() {
1875    return this.heapSize.sum();
1876  }
1877
1878  @Override
1879  public long size() {
1880    return this.realCacheSize.sum();
1881  }
1882
1883  @Override
1884  public long getCurrentDataSize() {
1885    return size();
1886  }
1887
1888  @Override
1889  public long getFreeSize() {
1890    if (!isCacheInitialized("BucketCache:getFreeSize")) {
1891      return 0;
1892    }
1893    return this.bucketAllocator.getFreeSize();
1894  }
1895
1896  @Override
1897  public long getBlockCount() {
1898    return this.blockNumber.sum();
1899  }
1900
1901  @Override
1902  public long getDataBlockCount() {
1903    return getBlockCount();
1904  }
1905
1906  @Override
1907  public long getCurrentSize() {
1908    if (!isCacheInitialized("BucketCache::getCurrentSize")) {
1909      return 0;
1910    }
1911    return this.bucketAllocator.getUsedSize();
1912  }
1913
1914  protected String getAlgorithm() {
1915    return algorithm;
1916  }
1917
1918  /**
1919   * Evicts all blocks for a specific HFile.
1920   * <p>
1921   * This is used for evict-on-close to remove all blocks of a specific HFile.
1922   * @return the number of blocks evicted
1923   */
1924  @Override
1925  public int evictBlocksByHfileName(String hfileName) {
1926    return evictBlocksRangeByHfileName(hfileName, 0, Long.MAX_VALUE);
1927  }
1928
1929  @Override
1930  public int evictBlocksByHfilePath(Path hfilePath) {
1931    return evictBlocksRangeByHfileName(hfilePath.getName(), hfilePath, 0, Long.MAX_VALUE);
1932  }
1933
1934  public int evictBlocksRangeByHfileName(String hfileName, Path filePath, long initOffset,
1935    long endOffset) {
1936    Set<BlockCacheKey> keySet = getAllCacheKeysForFile(hfileName, initOffset, endOffset);
1937    LOG.debug("found {} blocks for file {}, starting offset: {}, end offset: {}", keySet.size(),
1938      hfileName, initOffset, endOffset);
1939    int numEvicted = 0;
1940    for (BlockCacheKey key : keySet) {
1941      if (filePath != null) {
1942        key.setFilePath(filePath);
1943      }
1944      if (evictBlock(key)) {
1945        ++numEvicted;
1946      }
1947    }
1948    return numEvicted;
1949  }
1950
1951  @Override
1952  public int evictBlocksRangeByHfileName(String hfileName, long initOffset, long endOffset) {
1953    return evictBlocksRangeByHfileName(hfileName, null, initOffset, endOffset);
1954  }
1955
1956  private Set<BlockCacheKey> getAllCacheKeysForFile(String hfileName, long init, long end) {
1957    return blocksByHFile.subSet(new BlockCacheKey(hfileName, init), true,
1958      new BlockCacheKey(hfileName, end), true);
1959  }
1960
1961  /**
1962   * Used to group bucket entries into priority buckets. There will be a BucketEntryGroup for each
1963   * priority (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate
1964   * number of elements out of each according to configuration parameters and their relative sizes.
1965   */
1966  private class BucketEntryGroup {
1967
1968    private CachedEntryQueue queue;
1969    private long totalSize = 0;
1970    private long bucketSize;
1971
1972    public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) {
1973      this.bucketSize = bucketSize;
1974      queue = new CachedEntryQueue(bytesToFree, blockSize);
1975      totalSize = 0;
1976    }
1977
1978    public void add(Map.Entry<BlockCacheKey, BucketEntry> block) {
1979      totalSize += block.getValue().getLength();
1980      queue.add(block);
1981    }
1982
1983    public long free(long toFree) {
1984      Map.Entry<BlockCacheKey, BucketEntry> entry;
1985      long freedBytes = 0;
1986      // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free
1987      // What to do then? Caching attempt fail? Need some changes in cacheBlock API?
1988      while ((entry = queue.pollLast()) != null) {
1989        BlockCacheKey blockCacheKey = entry.getKey();
1990        BucketEntry be = entry.getValue();
1991        if (evictBucketEntryIfNoRpcReferenced(blockCacheKey, be)) {
1992          freedBytes += be.getLength();
1993        }
1994        if (freedBytes >= toFree) {
1995          return freedBytes;
1996        }
1997      }
1998      return freedBytes;
1999    }
2000
2001    public long overflow() {
2002      return totalSize - bucketSize;
2003    }
2004
2005    public long totalSize() {
2006      return totalSize;
2007    }
2008  }
2009
2010  /**
2011   * Block Entry stored in the memory with key,data and so on
2012   */
2013  static class RAMQueueEntry {
2014    private final BlockCacheKey key;
2015    private final Cacheable data;
2016    private long accessCounter;
2017    private boolean inMemory;
2018    private boolean isCachePersistent;
2019
2020    private boolean isPrefetch;
2021
2022    RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory,
2023      boolean isCachePersistent, boolean isPrefetch) {
2024      this.key = bck;
2025      this.data = data;
2026      this.accessCounter = accessCounter;
2027      this.inMemory = inMemory;
2028      this.isCachePersistent = isCachePersistent;
2029      this.isPrefetch = isPrefetch;
2030    }
2031
2032    public Cacheable getData() {
2033      return data;
2034    }
2035
2036    public BlockCacheKey getKey() {
2037      return key;
2038    }
2039
2040    public boolean isPrefetch() {
2041      return isPrefetch;
2042    }
2043
2044    public void access(long accessCounter) {
2045      this.accessCounter = accessCounter;
2046    }
2047
2048    private ByteBuffAllocator getByteBuffAllocator() {
2049      if (data instanceof HFileBlock) {
2050        return ((HFileBlock) data).getByteBuffAllocator();
2051      }
2052      return ByteBuffAllocator.HEAP;
2053    }
2054
2055    public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc,
2056      final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler,
2057      ByteBuffer metaBuff, final Long acceptableSize) throws IOException {
2058      int len = data.getSerializedLength();
2059      // This cacheable thing can't be serialized
2060      if (len == 0) {
2061        return null;
2062      }
2063      if (isCachePersistent && data instanceof HFileBlock) {
2064        len += Long.BYTES; // we need to record the cache time for consistency check in case of
2065                           // recovery
2066      }
2067      long offset = alloc.allocateBlock(len);
2068      // In the case of prefetch, we want to avoid freeSpace runs when the cache is full.
2069      // this makes the cache allocation more predictable, and is particularly important
2070      // when persistent cache is enabled, as it won't trigger evictions of the recovered blocks,
2071      // which are likely the most accessed and relevant blocks in the cache.
2072      if (isPrefetch() && alloc.getUsedSize() > acceptableSize) {
2073        alloc.freeBlock(offset, len);
2074        return null;
2075      }
2076      boolean succ = false;
2077      BucketEntry bucketEntry = null;
2078      try {
2079        int diskSizeWithHeader = (data instanceof HFileBlock)
2080          ? ((HFileBlock) data).getOnDiskSizeWithHeader()
2081          : data.getSerializedLength();
2082        bucketEntry = new BucketEntry(offset, len, diskSizeWithHeader, accessCounter, inMemory,
2083          createRecycler, getByteBuffAllocator());
2084        bucketEntry.setDeserializerReference(data.getDeserializer());
2085        if (data instanceof HFileBlock) {
2086          // If an instance of HFileBlock, save on some allocations.
2087          HFileBlock block = (HFileBlock) data;
2088          ByteBuff sliceBuf = block.getBufferReadOnly();
2089          block.getMetaData(metaBuff);
2090          // adds the cache time prior to the block and metadata part
2091          if (isCachePersistent) {
2092            ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
2093            buffer.putLong(bucketEntry.getCachedTime());
2094            buffer.rewind();
2095            ioEngine.write(buffer, offset);
2096            ioEngine.write(sliceBuf, (offset + Long.BYTES));
2097          } else {
2098            ioEngine.write(sliceBuf, offset);
2099          }
2100          ioEngine.write(metaBuff, offset + len - metaBuff.limit());
2101        } else {
2102          // Only used for testing.
2103          ByteBuffer bb = ByteBuffer.allocate(len);
2104          data.serialize(bb, true);
2105          ioEngine.write(bb, offset);
2106        }
2107        succ = true;
2108      } finally {
2109        if (!succ) {
2110          alloc.freeBlock(offset, len);
2111        }
2112      }
2113      realCacheSize.add(len);
2114      return bucketEntry;
2115    }
2116  }
2117
2118  /**
2119   * Only used in test
2120   */
2121  void stopWriterThreads() throws InterruptedException {
2122    for (WriterThread writerThread : writerThreads) {
2123      writerThread.disableWriter();
2124      writerThread.interrupt();
2125      writerThread.join();
2126    }
2127  }
2128
2129  @Override
2130  public Iterator<CachedBlock> iterator() {
2131    // Don't bother with ramcache since stuff is in here only a little while.
2132    final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i = this.backingMap.entrySet().iterator();
2133    return new Iterator<CachedBlock>() {
2134      private final long now = System.nanoTime();
2135
2136      @Override
2137      public boolean hasNext() {
2138        return i.hasNext();
2139      }
2140
2141      @Override
2142      public CachedBlock next() {
2143        final Map.Entry<BlockCacheKey, BucketEntry> e = i.next();
2144        return new CachedBlock() {
2145          @Override
2146          public String toString() {
2147            return BlockCacheUtil.toString(this, now);
2148          }
2149
2150          @Override
2151          public BlockPriority getBlockPriority() {
2152            return e.getValue().getPriority();
2153          }
2154
2155          @Override
2156          public BlockType getBlockType() {
2157            // Not held by BucketEntry. Could add it if wanted on BucketEntry creation.
2158            return null;
2159          }
2160
2161          @Override
2162          public long getOffset() {
2163            return e.getKey().getOffset();
2164          }
2165
2166          @Override
2167          public long getSize() {
2168            return e.getValue().getLength();
2169          }
2170
2171          @Override
2172          public long getCachedTime() {
2173            return e.getValue().getCachedTime();
2174          }
2175
2176          @Override
2177          public String getFilename() {
2178            return e.getKey().getHfileName();
2179          }
2180
2181          @Override
2182          public int compareTo(CachedBlock other) {
2183            int diff = this.getFilename().compareTo(other.getFilename());
2184            if (diff != 0) return diff;
2185
2186            diff = Long.compare(this.getOffset(), other.getOffset());
2187            if (diff != 0) return diff;
2188            if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
2189              throw new IllegalStateException(
2190                "" + this.getCachedTime() + ", " + other.getCachedTime());
2191            }
2192            return Long.compare(other.getCachedTime(), this.getCachedTime());
2193          }
2194
2195          @Override
2196          public int hashCode() {
2197            return e.getKey().hashCode();
2198          }
2199
2200          @Override
2201          public boolean equals(Object obj) {
2202            if (obj instanceof CachedBlock) {
2203              CachedBlock cb = (CachedBlock) obj;
2204              return compareTo(cb) == 0;
2205            } else {
2206              return false;
2207            }
2208          }
2209        };
2210      }
2211
2212      @Override
2213      public void remove() {
2214        throw new UnsupportedOperationException();
2215      }
2216    };
2217  }
2218
2219  @Override
2220  public BlockCache[] getBlockCaches() {
2221    return null;
2222  }
2223
2224  public int getRpcRefCount(BlockCacheKey cacheKey) {
2225    BucketEntry bucketEntry = backingMap.get(cacheKey);
2226    if (bucketEntry != null) {
2227      return bucketEntry.refCnt() - (bucketEntry.markedAsEvicted.get() ? 0 : 1);
2228    }
2229    return 0;
2230  }
2231
2232  float getAcceptableFactor() {
2233    return acceptableFactor;
2234  }
2235
2236  float getMinFactor() {
2237    return minFactor;
2238  }
2239
2240  float getExtraFreeFactor() {
2241    return extraFreeFactor;
2242  }
2243
2244  float getSingleFactor() {
2245    return singleFactor;
2246  }
2247
2248  float getMultiFactor() {
2249    return multiFactor;
2250  }
2251
2252  float getMemoryFactor() {
2253    return memoryFactor;
2254  }
2255
2256  long getQueueAdditionWaitTime() {
2257    return queueAdditionWaitTime;
2258  }
2259
2260  long getPersistenceChunkSize() {
2261    return persistenceChunkSize;
2262  }
2263
2264  long getBucketcachePersistInterval() {
2265    return bucketcachePersistInterval;
2266  }
2267
2268  public String getPersistencePath() {
2269    return persistencePath;
2270  }
2271
2272  /**
2273   * Wrapped the delegate ConcurrentMap with maintaining its block's reference count.
2274   */
2275  static class RAMCache {
2276    /**
2277     * Defined the map as {@link ConcurrentHashMap} explicitly here, because in
2278     * {@link RAMCache#get(BlockCacheKey)} and
2279     * {@link RAMCache#putIfAbsent(BlockCacheKey, BucketCache.RAMQueueEntry)} , we need to guarantee
2280     * the atomicity of map#computeIfPresent(key, func) and map#putIfAbsent(key, func). Besides, the
2281     * func method can execute exactly once only when the key is present(or absent) and under the
2282     * lock context. Otherwise, the reference count of block will be messed up. Notice that the
2283     * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that.
2284     */
2285    final ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> delegate = new ConcurrentHashMap<>();
2286
2287    public boolean containsKey(BlockCacheKey key) {
2288      return delegate.containsKey(key);
2289    }
2290
2291    public RAMQueueEntry get(BlockCacheKey key) {
2292      return delegate.computeIfPresent(key, (k, re) -> {
2293        // It'll be referenced by RPC, so retain atomically here. if the get and retain is not
2294        // atomic, another thread may remove and release the block, when retaining in this thread we
2295        // may retain a block with refCnt=0 which is disallowed. (see HBASE-22422)
2296        re.getData().retain();
2297        return re;
2298      });
2299    }
2300
2301    /**
2302     * Return the previous associated value, or null if absent. It has the same meaning as
2303     * {@link ConcurrentMap#putIfAbsent(Object, Object)}
2304     */
2305    public RAMQueueEntry putIfAbsent(BlockCacheKey key, RAMQueueEntry entry) {
2306      AtomicBoolean absent = new AtomicBoolean(false);
2307      RAMQueueEntry re = delegate.computeIfAbsent(key, k -> {
2308        // The RAMCache reference to this entry, so reference count should be increment.
2309        entry.getData().retain();
2310        absent.set(true);
2311        return entry;
2312      });
2313      return absent.get() ? null : re;
2314    }
2315
2316    public boolean remove(BlockCacheKey key) {
2317      return remove(key, re -> {
2318      });
2319    }
2320
2321    /**
2322     * Defined an {@link Consumer} here, because once the removed entry release its reference count,
2323     * then it's ByteBuffers may be recycled and accessing it outside this method will be thrown an
2324     * exception. the consumer will access entry to remove before release its reference count.
2325     * Notice, don't change its reference count in the {@link Consumer}
2326     */
2327    public boolean remove(BlockCacheKey key, Consumer<RAMQueueEntry> action) {
2328      RAMQueueEntry previous = delegate.remove(key);
2329      action.accept(previous);
2330      if (previous != null) {
2331        previous.getData().release();
2332      }
2333      return previous != null;
2334    }
2335
2336    public boolean isEmpty() {
2337      return delegate.isEmpty();
2338    }
2339
2340    public void clear() {
2341      Iterator<Map.Entry<BlockCacheKey, RAMQueueEntry>> it = delegate.entrySet().iterator();
2342      while (it.hasNext()) {
2343        RAMQueueEntry re = it.next().getValue();
2344        it.remove();
2345        re.getData().release();
2346      }
2347    }
2348
2349    public boolean hasBlocksForFile(String fileName) {
2350      return delegate.keySet().stream().filter(key -> key.getHfileName().equals(fileName))
2351        .findFirst().isPresent();
2352    }
2353  }
2354
2355  public Map<BlockCacheKey, BucketEntry> getBackingMap() {
2356    return backingMap;
2357  }
2358
2359  public AtomicBoolean getBackingMapValidated() {
2360    return backingMapValidated;
2361  }
2362
2363  @Override
2364  public Optional<Map<String, Pair<String, Long>>> getFullyCachedFiles() {
2365    return Optional.of(fullyCachedFiles);
2366  }
2367
2368  public static Optional<BucketCache> getBucketCacheFromCacheConfig(CacheConfig cacheConf) {
2369    if (cacheConf.getBlockCache().isPresent()) {
2370      BlockCache bc = cacheConf.getBlockCache().get();
2371      if (bc instanceof CombinedBlockCache) {
2372        BlockCache l2 = ((CombinedBlockCache) bc).getSecondLevelCache();
2373        if (l2 instanceof BucketCache) {
2374          return Optional.of((BucketCache) l2);
2375        }
2376      } else if (bc instanceof BucketCache) {
2377        return Optional.of((BucketCache) bc);
2378      }
2379    }
2380    return Optional.empty();
2381  }
2382
2383  @Override
2384  public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int dataBlockCount,
2385    long size) {
2386    // block eviction may be happening in the background as prefetch runs,
2387    // so we need to count all blocks for this file in the backing map under
2388    // a read lock for the block offset
2389    final List<ReentrantReadWriteLock> locks = new ArrayList<>();
2390    LOG.debug("Notifying caching completed for file {}, with total blocks {}, and data blocks {}",
2391      fileName, totalBlockCount, dataBlockCount);
2392    try {
2393      final MutableInt count = new MutableInt();
2394      LOG.debug("iterating over {} entries in the backing map", backingMap.size());
2395      Set<BlockCacheKey> result = getAllCacheKeysForFile(fileName.getName(), 0, Long.MAX_VALUE);
2396      if (result.isEmpty() && StoreFileInfo.isReference(fileName)) {
2397        result = getAllCacheKeysForFile(
2398          StoreFileInfo.getReferredToRegionAndFile(fileName.getName()).getSecond(), 0,
2399          Long.MAX_VALUE);
2400      }
2401      result.stream().forEach(entry -> {
2402        LOG.debug("found block for file {} in the backing map. Acquiring read lock for offset {}",
2403          fileName.getName(), entry.getOffset());
2404        ReentrantReadWriteLock lock = offsetLock.getLock(entry.getOffset());
2405        lock.readLock().lock();
2406        locks.add(lock);
2407        if (backingMap.containsKey(entry) && entry.getBlockType().isData()) {
2408          count.increment();
2409        }
2410      });
2411      // BucketCache would only have data blocks
2412      if (dataBlockCount == count.getValue()) {
2413        LOG.debug("File {} has now been fully cached.", fileName);
2414        fileCacheCompleted(fileName, size);
2415      } else {
2416        LOG.debug(
2417          "Prefetch executor completed for {}, but only {} data blocks were cached. "
2418            + "Total data blocks for file: {}. "
2419            + "Checking for blocks pending cache in cache writer queue.",
2420          fileName, count.getValue(), dataBlockCount);
2421        if (ramCache.hasBlocksForFile(fileName.getName())) {
2422          for (ReentrantReadWriteLock lock : locks) {
2423            lock.readLock().unlock();
2424          }
2425          locks.clear();
2426          LOG.debug("There are still blocks pending caching for file {}. Will sleep 100ms "
2427            + "and try the verification again.", fileName);
2428          Thread.sleep(100);
2429          notifyFileCachingCompleted(fileName, totalBlockCount, dataBlockCount, size);
2430        } else {
2431          LOG.info(
2432            "The total block count was {}. We found only {} data blocks cached from "
2433              + "a total of {} data blocks for file {}, "
2434              + "but no blocks pending caching. Maybe cache is full or evictions "
2435              + "happened concurrently to cache prefetch.",
2436            totalBlockCount, count, dataBlockCount, fileName);
2437        }
2438      }
2439    } catch (InterruptedException e) {
2440      throw new RuntimeException(e);
2441    } finally {
2442      for (ReentrantReadWriteLock lock : locks) {
2443        lock.readLock().unlock();
2444      }
2445    }
2446  }
2447
2448  @Override
2449  public Optional<Boolean> blockFitsIntoTheCache(HFileBlock block) {
2450    if (!isCacheInitialized("blockFitsIntoTheCache")) {
2451      return Optional.of(false);
2452    }
2453
2454    long currentUsed = bucketAllocator.getUsedSize();
2455    boolean result = (currentUsed + block.getOnDiskSizeWithHeader()) < acceptableSize();
2456    return Optional.of(result);
2457  }
2458
2459  @Override
2460  public Optional<Boolean> shouldCacheFile(HFileInfo hFileInfo, Configuration conf) {
2461    String fileName = hFileInfo.getHFileContext().getHFileName();
2462    DataTieringManager dataTieringManager = DataTieringManager.getInstance();
2463    if (dataTieringManager != null && !dataTieringManager.isHotData(hFileInfo, conf)) {
2464      LOG.debug("Data tiering is enabled for file: '{}' and it is not hot data", fileName);
2465      return Optional.of(false);
2466    }
2467    // if we don't have the file in fullyCachedFiles, we should cache it
2468    return Optional.of(!fullyCachedFiles.containsKey(fileName));
2469  }
2470
2471  @Override
2472  public Optional<Boolean> shouldCacheBlock(BlockCacheKey key, long maxTimestamp,
2473    Configuration conf) {
2474    DataTieringManager dataTieringManager = DataTieringManager.getInstance();
2475    if (dataTieringManager != null && !dataTieringManager.isHotData(maxTimestamp, conf)) {
2476      LOG.debug("Data tiering is enabled for file: '{}' and it is not hot data",
2477        key.getHfileName());
2478      return Optional.of(false);
2479    }
2480    return Optional.of(true);
2481  }
2482
2483  @Override
2484  public Optional<Boolean> isAlreadyCached(BlockCacheKey key) {
2485    boolean foundKey = backingMap.containsKey(key);
2486    // if there's no entry for the key itself, we need to check if this key is for a reference,
2487    // and if so, look for a block from the referenced file using this getBlockForReference method.
2488    return Optional.of(foundKey ? true : getBlockForReference(key) != null);
2489  }
2490
2491  @Override
2492  public Optional<Integer> getBlockSize(BlockCacheKey key) {
2493    BucketEntry entry = backingMap.get(key);
2494    if (entry == null) {
2495      // the key might be for a reference tha we had found the block from the referenced file in
2496      // the cache when we first tried to cache it.
2497      entry = getBlockForReference(key);
2498      return entry == null ? Optional.empty() : Optional.of(entry.getOnDiskSizeWithHeader());
2499    } else {
2500      return Optional.of(entry.getOnDiskSizeWithHeader());
2501    }
2502
2503  }
2504
2505  boolean isCacheInitialized(String api) {
2506    if (cacheState == CacheState.INITIALIZING) {
2507      LOG.warn("Bucket initialisation pending at {}", api);
2508      return false;
2509    }
2510    return true;
2511  }
2512
2513  @Override
2514  public boolean waitForCacheInitialization(long timeout) {
2515    try {
2516      while (cacheState == CacheState.INITIALIZING) {
2517        if (timeout <= 0) {
2518          break;
2519        }
2520        Thread.sleep(100);
2521        timeout -= 100;
2522      }
2523    } finally {
2524      return isCacheEnabled();
2525    }
2526  }
2527}