001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BLOCKCACHE_STATS_PERIODS; 021import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BLOCKCACHE_STATS_PERIOD_MINUTES_KEY; 022import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.DEFAULT_BLOCKCACHE_STATS_PERIODS; 023import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.DEFAULT_BLOCKCACHE_STATS_PERIOD_MINUTES; 024import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; 025 026import java.io.File; 027import java.io.FileInputStream; 028import java.io.FileOutputStream; 029import java.io.IOException; 030import java.nio.ByteBuffer; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.Collections; 034import java.util.Comparator; 035import java.util.HashSet; 036import java.util.Iterator; 037import java.util.List; 038import java.util.Map; 039import java.util.NavigableSet; 040import java.util.Optional; 041import java.util.PriorityQueue; 042import java.util.Set; 043import java.util.concurrent.ArrayBlockingQueue; 044import java.util.concurrent.BlockingQueue; 045import java.util.concurrent.ConcurrentHashMap; 046import java.util.concurrent.ConcurrentMap; 047import java.util.concurrent.ConcurrentSkipListSet; 048import java.util.concurrent.Executors; 049import java.util.concurrent.ScheduledExecutorService; 050import java.util.concurrent.TimeUnit; 051import java.util.concurrent.atomic.AtomicBoolean; 052import java.util.concurrent.atomic.AtomicLong; 053import java.util.concurrent.atomic.LongAdder; 054import java.util.concurrent.locks.Lock; 055import java.util.concurrent.locks.ReentrantLock; 056import java.util.concurrent.locks.ReentrantReadWriteLock; 057import java.util.function.Consumer; 058import java.util.function.Function; 059import org.apache.commons.io.IOUtils; 060import org.apache.commons.lang3.mutable.MutableInt; 061import org.apache.hadoop.conf.Configuration; 062import org.apache.hadoop.fs.Path; 063import org.apache.hadoop.hbase.HBaseConfiguration; 064import org.apache.hadoop.hbase.HBaseIOException; 065import org.apache.hadoop.hbase.TableName; 066import org.apache.hadoop.hbase.client.Admin; 067import org.apache.hadoop.hbase.io.ByteBuffAllocator; 068import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; 069import org.apache.hadoop.hbase.io.HeapSize; 070import org.apache.hadoop.hbase.io.hfile.BlockCache; 071import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 072import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil; 073import org.apache.hadoop.hbase.io.hfile.BlockPriority; 074import org.apache.hadoop.hbase.io.hfile.BlockType; 075import org.apache.hadoop.hbase.io.hfile.CacheConfig; 076import org.apache.hadoop.hbase.io.hfile.CacheStats; 077import org.apache.hadoop.hbase.io.hfile.Cacheable; 078import org.apache.hadoop.hbase.io.hfile.CachedBlock; 079import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; 080import org.apache.hadoop.hbase.io.hfile.HFileBlock; 081import org.apache.hadoop.hbase.io.hfile.HFileContext; 082import org.apache.hadoop.hbase.nio.ByteBuff; 083import org.apache.hadoop.hbase.nio.RefCnt; 084import org.apache.hadoop.hbase.protobuf.ProtobufMagic; 085import org.apache.hadoop.hbase.regionserver.HRegion; 086import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 087import org.apache.hadoop.hbase.util.Bytes; 088import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 089import org.apache.hadoop.hbase.util.IdReadWriteLock; 090import org.apache.hadoop.hbase.util.IdReadWriteLockStrongRef; 091import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool; 092import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool.ReferenceType; 093import org.apache.hadoop.hbase.util.Pair; 094import org.apache.hadoop.util.StringUtils; 095import org.apache.yetus.audience.InterfaceAudience; 096import org.slf4j.Logger; 097import org.slf4j.LoggerFactory; 098 099import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 100import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 101 102import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos; 103 104/** 105 * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses BucketCache#ramCache 106 * and BucketCache#backingMap in order to determine if a given element is in the cache. The bucket 107 * cache can use off-heap memory {@link ByteBufferIOEngine} or mmap 108 * {@link ExclusiveMemoryMmapIOEngine} or pmem {@link SharedMemoryMmapIOEngine} or local files 109 * {@link FileIOEngine} to store/read the block data. 110 * <p> 111 * Eviction is via a similar algorithm as used in 112 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache} 113 * <p> 114 * BucketCache can be used as mainly a block cache (see 115 * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with a BlockCache to 116 * decrease CMS GC and heap fragmentation. 117 * <p> 118 * It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store blocks) to 119 * enlarge cache space via a victim cache. 120 */ 121@InterfaceAudience.Private 122public class BucketCache implements BlockCache, HeapSize { 123 private static final Logger LOG = LoggerFactory.getLogger(BucketCache.class); 124 125 /** Priority buckets config */ 126 static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor"; 127 static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor"; 128 static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor"; 129 static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor"; 130 static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor"; 131 static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor"; 132 static final String BACKING_MAP_PERSISTENCE_CHUNK_SIZE = 133 "hbase.bucketcache.persistence.chunksize"; 134 135 /** Use strong reference for offsetLock or not */ 136 private static final String STRONG_REF_KEY = "hbase.bucketcache.offsetlock.usestrongref"; 137 private static final boolean STRONG_REF_DEFAULT = false; 138 139 /** The cache age of blocks to check if the related file is present on any online regions. */ 140 static final String BLOCK_ORPHAN_GRACE_PERIOD = 141 "hbase.bucketcache.block.orphan.evictgraceperiod.seconds"; 142 143 static final long BLOCK_ORPHAN_GRACE_PERIOD_DEFAULT = 24 * 60 * 60 * 1000L; 144 145 /** Priority buckets */ 146 static final float DEFAULT_SINGLE_FACTOR = 0.25f; 147 static final float DEFAULT_MULTI_FACTOR = 0.50f; 148 static final float DEFAULT_MEMORY_FACTOR = 0.25f; 149 static final float DEFAULT_MIN_FACTOR = 0.85f; 150 151 static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f; 152 static final float DEFAULT_ACCEPT_FACTOR = 0.95f; 153 154 // Number of blocks to clear for each of the bucket size that is full 155 static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2; 156 157 /** Statistics thread */ 158 private static final int statThreadPeriod = 5 * 60; 159 160 final static int DEFAULT_WRITER_THREADS = 3; 161 final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; 162 163 final static long DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE = 10000; 164 165 // Store/read block data 166 transient final IOEngine ioEngine; 167 168 // Store the block in this map before writing it to cache 169 transient final RAMCache ramCache; 170 171 // In this map, store the block's meta data like offset, length 172 transient Map<BlockCacheKey, BucketEntry> backingMap; 173 174 private AtomicBoolean backingMapValidated = new AtomicBoolean(false); 175 176 /** 177 * Map of hFile -> Region -> File size. This map is used to track all files completed prefetch, 178 * together with the region those belong to and the total cached size for the 179 * region.TestBlockEvictionOnRegionMovement 180 */ 181 transient final Map<String, Pair<String, Long>> fullyCachedFiles = new ConcurrentHashMap<>(); 182 /** 183 * Map of region -> total size of the region prefetched on this region server. This is the total 184 * size of hFiles for this region prefetched on this region server 185 */ 186 final Map<String, Long> regionCachedSize = new ConcurrentHashMap<>(); 187 188 private transient BucketCachePersister cachePersister; 189 190 /** 191 * Enum to represent the state of cache 192 */ 193 protected enum CacheState { 194 // Initializing: State when the cache is being initialised from persistence. 195 INITIALIZING, 196 // Enabled: State when cache is initialised and is ready. 197 ENABLED, 198 // Disabled: State when the cache is disabled. 199 DISABLED 200 } 201 202 /** 203 * Flag if the cache is enabled or not... We shut it off if there are IO errors for some time, so 204 * that Bucket IO exceptions/errors don't bring down the HBase server. 205 */ 206 private volatile CacheState cacheState; 207 208 /** 209 * A list of writer queues. We have a queue per {@link WriterThread} we have running. In other 210 * words, the work adding blocks to the BucketCache is divided up amongst the running 211 * WriterThreads. Its done by taking hash of the cache key modulo queue count. WriterThread when 212 * it runs takes whatever has been recently added and 'drains' the entries to the BucketCache. It 213 * then updates the ramCache and backingMap accordingly. 214 */ 215 transient final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList<>(); 216 transient final WriterThread[] writerThreads; 217 218 /** Volatile boolean to track if free space is in process or not */ 219 private volatile boolean freeInProgress = false; 220 private transient final Lock freeSpaceLock = new ReentrantLock(); 221 222 private final LongAdder realCacheSize = new LongAdder(); 223 private final LongAdder heapSize = new LongAdder(); 224 /** Current number of cached elements */ 225 private final LongAdder blockNumber = new LongAdder(); 226 227 /** Cache access count (sequential ID) */ 228 private final AtomicLong accessCount = new AtomicLong(); 229 230 private static final int DEFAULT_CACHE_WAIT_TIME = 50; 231 232 private final BucketCacheStats cacheStats; 233 private final String persistencePath; 234 static AtomicBoolean isCacheInconsistent = new AtomicBoolean(false); 235 private final long cacheCapacity; 236 /** Approximate block size */ 237 private final long blockSize; 238 239 /** Duration of IO errors tolerated before we disable cache, 1 min as default */ 240 private final int ioErrorsTolerationDuration; 241 // 1 min 242 public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000; 243 244 // Start time of first IO error when reading or writing IO Engine, it will be 245 // reset after a successful read/write. 246 private volatile long ioErrorStartTime = -1; 247 248 private transient Configuration conf; 249 250 /** 251 * A ReentrantReadWriteLock to lock on a particular block identified by offset. The purpose of 252 * this is to avoid freeing the block which is being read. 253 * <p> 254 */ 255 transient final IdReadWriteLock<Long> offsetLock; 256 257 transient NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<>( 258 Comparator.comparing(BlockCacheKey::getHfileName).thenComparingLong(BlockCacheKey::getOffset)); 259 260 /** Statistics thread schedule pool (for heavy debugging, could remove) */ 261 private transient final ScheduledExecutorService scheduleThreadPool = 262 Executors.newScheduledThreadPool(1, 263 new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build()); 264 265 // Allocate or free space for the block 266 private transient BucketAllocator bucketAllocator; 267 268 /** Acceptable size of cache (no evictions if size < acceptable) */ 269 private float acceptableFactor; 270 271 /** Minimum threshold of cache (when evicting, evict until size < min) */ 272 private float minFactor; 273 274 /** 275 * Free this floating point factor of extra blocks when evicting. For example free the number of 276 * blocks requested * (1 + extraFreeFactor) 277 */ 278 private float extraFreeFactor; 279 280 /** Single access bucket size */ 281 private float singleFactor; 282 283 /** Multiple access bucket size */ 284 private float multiFactor; 285 286 /** In-memory bucket size */ 287 private float memoryFactor; 288 289 private long bucketcachePersistInterval; 290 291 private static final String FILE_VERIFY_ALGORITHM = 292 "hbase.bucketcache.persistent.file.integrity.check.algorithm"; 293 private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5"; 294 295 static final String QUEUE_ADDITION_WAIT_TIME = "hbase.bucketcache.queue.addition.waittime"; 296 private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0; 297 private long queueAdditionWaitTime; 298 /** 299 * Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file 300 * integrity, default algorithm is MD5 301 */ 302 private String algorithm; 303 304 private long persistenceChunkSize; 305 306 /* Tracing failed Bucket Cache allocations. */ 307 private long allocFailLogPrevTs; // time of previous log event for allocation failure. 308 private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000; // Default 1 minute. 309 310 private transient Map<String, HRegion> onlineRegions; 311 312 private long orphanBlockGracePeriod = 0; 313 314 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 315 int writerThreadNum, int writerQLen, String persistencePath) throws IOException { 316 this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 317 persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create()); 318 } 319 320 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 321 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, 322 Configuration conf) throws IOException { 323 this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 324 persistencePath, ioErrorsTolerationDuration, conf, null); 325 } 326 327 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 328 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, 329 Configuration conf, Map<String, HRegion> onlineRegions) throws IOException { 330 Preconditions.checkArgument(blockSize > 0, 331 "BucketCache capacity is set to " + blockSize + ", can not be less than 0"); 332 boolean useStrongRef = conf.getBoolean(STRONG_REF_KEY, STRONG_REF_DEFAULT); 333 if (useStrongRef) { 334 this.offsetLock = new IdReadWriteLockStrongRef<>(); 335 } else { 336 this.offsetLock = new IdReadWriteLockWithObjectPool<>(ReferenceType.SOFT); 337 } 338 this.conf = conf; 339 this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM); 340 this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath); 341 this.writerThreads = new WriterThread[writerThreadNum]; 342 this.onlineRegions = onlineRegions; 343 this.orphanBlockGracePeriod = 344 conf.getLong(BLOCK_ORPHAN_GRACE_PERIOD, BLOCK_ORPHAN_GRACE_PERIOD_DEFAULT); 345 long blockNumCapacity = capacity / blockSize; 346 if (blockNumCapacity >= Integer.MAX_VALUE) { 347 // Enough for about 32TB of cache! 348 throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now"); 349 } 350 351 // these sets the dynamic configs 352 this.onConfigurationChange(conf); 353 this.cacheStats = 354 new BucketCacheStats(conf.getInt(BLOCKCACHE_STATS_PERIODS, DEFAULT_BLOCKCACHE_STATS_PERIODS), 355 conf.getInt(BLOCKCACHE_STATS_PERIOD_MINUTES_KEY, DEFAULT_BLOCKCACHE_STATS_PERIOD_MINUTES)); 356 357 LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor 358 + ", minFactor: " + minFactor + ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: " 359 + singleFactor + ", multiFactor: " + multiFactor + ", memoryFactor: " + memoryFactor 360 + ", useStrongRef: " + useStrongRef); 361 362 this.cacheCapacity = capacity; 363 this.persistencePath = persistencePath; 364 this.blockSize = blockSize; 365 this.ioErrorsTolerationDuration = ioErrorsTolerationDuration; 366 this.cacheState = CacheState.INITIALIZING; 367 368 this.allocFailLogPrevTs = 0; 369 370 for (int i = 0; i < writerThreads.length; ++i) { 371 writerQueues.add(new ArrayBlockingQueue<>(writerQLen)); 372 } 373 374 assert writerQueues.size() == writerThreads.length; 375 this.ramCache = new RAMCache(); 376 377 this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity); 378 instantiateWriterThreads(); 379 380 if (isCachePersistent()) { 381 if (ioEngine instanceof FileIOEngine) { 382 startBucketCachePersisterThread(); 383 } 384 startPersistenceRetriever(bucketSizes, capacity); 385 } else { 386 bucketAllocator = new BucketAllocator(capacity, bucketSizes); 387 this.cacheState = CacheState.ENABLED; 388 startWriterThreads(); 389 } 390 391 // Run the statistics thread periodically to print the cache statistics log 392 // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log 393 // every five minutes. 394 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod, 395 statThreadPeriod, TimeUnit.SECONDS); 396 LOG.info("Started bucket cache; ioengine=" + ioEngineName + ", capacity=" 397 + StringUtils.byteDesc(capacity) + ", blockSize=" + StringUtils.byteDesc(blockSize) 398 + ", writerThreadNum=" + writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" 399 + persistencePath + ", bucketAllocator=" + BucketAllocator.class.getName()); 400 } 401 402 private void startPersistenceRetriever(int[] bucketSizes, long capacity) { 403 Runnable persistentCacheRetriever = () -> { 404 try { 405 retrieveFromFile(bucketSizes); 406 LOG.info("Persistent bucket cache recovery from {} is complete.", persistencePath); 407 } catch (Throwable ex) { 408 LOG.warn("Can't restore from file[{}]. The bucket cache will be reset and rebuilt." 409 + " Exception seen: ", persistencePath, ex); 410 backingMap.clear(); 411 fullyCachedFiles.clear(); 412 backingMapValidated.set(true); 413 regionCachedSize.clear(); 414 try { 415 bucketAllocator = new BucketAllocator(capacity, bucketSizes); 416 } catch (BucketAllocatorException allocatorException) { 417 LOG.error("Exception during Bucket Allocation", allocatorException); 418 } 419 } finally { 420 this.cacheState = CacheState.ENABLED; 421 startWriterThreads(); 422 } 423 }; 424 Thread t = new Thread(persistentCacheRetriever); 425 t.start(); 426 } 427 428 private void sanityCheckConfigs() { 429 Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0, 430 ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 431 Preconditions.checkArgument(minFactor <= 1 && minFactor >= 0, 432 MIN_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 433 Preconditions.checkArgument(minFactor <= acceptableFactor, 434 MIN_FACTOR_CONFIG_NAME + " must be <= " + ACCEPT_FACTOR_CONFIG_NAME); 435 Preconditions.checkArgument(extraFreeFactor >= 0, 436 EXTRA_FREE_FACTOR_CONFIG_NAME + " must be greater than 0.0"); 437 Preconditions.checkArgument(singleFactor <= 1 && singleFactor >= 0, 438 SINGLE_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 439 Preconditions.checkArgument(multiFactor <= 1 && multiFactor >= 0, 440 MULTI_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 441 Preconditions.checkArgument(memoryFactor <= 1 && memoryFactor >= 0, 442 MEMORY_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 443 Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1, 444 SINGLE_FACTOR_CONFIG_NAME + ", " + MULTI_FACTOR_CONFIG_NAME + ", and " 445 + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0"); 446 if (this.persistenceChunkSize <= 0) { 447 persistenceChunkSize = DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE; 448 } 449 } 450 451 /** 452 * Called by the constructor to instantiate the writer threads. 453 */ 454 private void instantiateWriterThreads() { 455 final String threadName = Thread.currentThread().getName(); 456 for (int i = 0; i < this.writerThreads.length; ++i) { 457 this.writerThreads[i] = new WriterThread(this.writerQueues.get(i)); 458 this.writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); 459 this.writerThreads[i].setDaemon(true); 460 } 461 } 462 463 /** 464 * Called by the constructor to start the writer threads. Used by tests that need to override 465 * starting the threads. 466 */ 467 protected void startWriterThreads() { 468 for (WriterThread thread : writerThreads) { 469 thread.start(); 470 } 471 } 472 473 void startBucketCachePersisterThread() { 474 LOG.info("Starting BucketCachePersisterThread"); 475 cachePersister = new BucketCachePersister(this, bucketcachePersistInterval); 476 cachePersister.setDaemon(true); 477 cachePersister.start(); 478 } 479 480 @Override 481 public boolean isCacheEnabled() { 482 return this.cacheState == CacheState.ENABLED; 483 } 484 485 @Override 486 public long getMaxSize() { 487 return this.cacheCapacity; 488 } 489 490 public String getIoEngine() { 491 return ioEngine.toString(); 492 } 493 494 /** 495 * Get the IOEngine from the IO engine name 496 * @return the IOEngine 497 */ 498 private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath) 499 throws IOException { 500 if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) { 501 // In order to make the usage simple, we only need the prefix 'files:' in 502 // document whether one or multiple file(s), but also support 'file:' for 503 // the compatibility 504 String[] filePaths = 505 ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER); 506 return new FileIOEngine(capacity, persistencePath != null, filePaths); 507 } else if (ioEngineName.startsWith("offheap")) { 508 return new ByteBufferIOEngine(capacity); 509 } else if (ioEngineName.startsWith("mmap:")) { 510 return new ExclusiveMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 511 } else if (ioEngineName.startsWith("pmem:")) { 512 // This mode of bucket cache creates an IOEngine over a file on the persistent memory 513 // device. Since the persistent memory device has its own address space the contents 514 // mapped to this address space does not get swapped out like in the case of mmapping 515 // on to DRAM. Hence the cells created out of the hfile blocks in the pmem bucket cache 516 // can be directly referred to without having to copy them onheap. Once the RPC is done, 517 // the blocks can be returned back as in case of ByteBufferIOEngine. 518 return new SharedMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 519 } else { 520 throw new IllegalArgumentException( 521 "Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap"); 522 } 523 } 524 525 public boolean isCachePersistenceEnabled() { 526 return persistencePath != null; 527 } 528 529 /** 530 * Cache the block with the specified name and buffer. 531 * @param cacheKey block's cache key 532 * @param buf block buffer 533 */ 534 @Override 535 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 536 cacheBlock(cacheKey, buf, false); 537 } 538 539 /** 540 * Cache the block with the specified name and buffer. 541 * @param cacheKey block's cache key 542 * @param cachedItem block buffer 543 * @param inMemory if block is in-memory 544 */ 545 @Override 546 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) { 547 cacheBlockWithWait(cacheKey, cachedItem, inMemory, false); 548 } 549 550 /** 551 * Cache the block with the specified name and buffer. 552 * @param cacheKey block's cache key 553 * @param cachedItem block buffer 554 * @param inMemory if block is in-memory 555 */ 556 @Override 557 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, 558 boolean waitWhenCache) { 559 cacheBlockWithWait(cacheKey, cachedItem, inMemory, waitWhenCache && queueAdditionWaitTime > 0); 560 } 561 562 /** 563 * Cache the block to ramCache 564 * @param cacheKey block's cache key 565 * @param cachedItem block buffer 566 * @param inMemory if block is in-memory 567 * @param wait if true, blocking wait when queue is full 568 */ 569 public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, 570 boolean wait) { 571 if (isCacheEnabled()) { 572 if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) { 573 if (shouldReplaceExistingCacheBlock(cacheKey, cachedItem)) { 574 BucketEntry bucketEntry = backingMap.get(cacheKey); 575 if (bucketEntry != null && bucketEntry.isRpcRef()) { 576 // avoid replace when there are RPC refs for the bucket entry in bucket cache 577 return; 578 } 579 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 580 } 581 } else { 582 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 583 } 584 } 585 } 586 587 protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) { 588 return BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, newBlock); 589 } 590 591 protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, 592 boolean inMemory, boolean wait) { 593 if (!isCacheEnabled()) { 594 return; 595 } 596 if (cacheKey.getBlockType() == null && cachedItem.getBlockType() != null) { 597 cacheKey.setBlockType(cachedItem.getBlockType()); 598 } 599 LOG.trace("Caching key={}, item={}", cacheKey, cachedItem); 600 // Stuff the entry into the RAM cache so it can get drained to the persistent store 601 RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), 602 inMemory, isCachePersistent() && ioEngine instanceof FileIOEngine, wait); 603 /** 604 * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same 605 * key in ramCache, the heap size of bucket cache need to update if replacing entry from 606 * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if 607 * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct 608 * one, then the heap size will mess up (HBASE-20789) 609 */ 610 if (ramCache.putIfAbsent(cacheKey, re) != null) { 611 return; 612 } 613 int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); 614 BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum); 615 boolean successfulAddition = false; 616 if (wait) { 617 try { 618 successfulAddition = bq.offer(re, queueAdditionWaitTime, TimeUnit.MILLISECONDS); 619 } catch (InterruptedException e) { 620 LOG.error("Thread interrupted: ", e); 621 Thread.currentThread().interrupt(); 622 } 623 } else { 624 successfulAddition = bq.offer(re); 625 } 626 if (!successfulAddition) { 627 LOG.debug("Failed to insert block {} into the cache writers queue", cacheKey); 628 ramCache.remove(cacheKey); 629 cacheStats.failInsert(); 630 } else { 631 this.blockNumber.increment(); 632 this.heapSize.add(cachedItem.heapSize()); 633 } 634 } 635 636 /** 637 * If the passed cache key relates to a reference (<hfile>.<parentEncRegion>), this 638 * method looks for the block from the referred file, in the cache. If present in the cache, the 639 * block for the referred file is returned, otherwise, this method returns null. It will also 640 * return null if the passed cache key doesn't relate to a reference. 641 * @param key the BlockCacheKey instance to look for in the cache. 642 * @return the cached block from the referred file, null if there's no such block in the cache or 643 * the passed key doesn't relate to a reference. 644 */ 645 public BucketEntry getBlockForReference(BlockCacheKey key) { 646 BucketEntry foundEntry = null; 647 String referredFileName = null; 648 if (StoreFileInfo.isReference(key.getHfileName())) { 649 referredFileName = StoreFileInfo.getReferredToRegionAndFile(key.getHfileName()).getSecond(); 650 } 651 if (referredFileName != null) { 652 BlockCacheKey convertedCacheKey = new BlockCacheKey(referredFileName, key.getOffset()); 653 foundEntry = backingMap.get(convertedCacheKey); 654 LOG.debug("Got a link/ref: {}. Related cacheKey: {}. Found entry: {}", key.getHfileName(), 655 convertedCacheKey, foundEntry); 656 } 657 return foundEntry; 658 } 659 660 /** 661 * Get the buffer of the block with the specified key. 662 * @param key block's cache key 663 * @param caching true if the caller caches blocks on cache misses 664 * @param repeat Whether this is a repeat lookup for the same block 665 * @param updateCacheMetrics Whether we should update cache metrics or not 666 * @return buffer of specified cache key, or null if not in cache 667 */ 668 @Override 669 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, 670 boolean updateCacheMetrics) { 671 if (!isCacheEnabled()) { 672 cacheStats.miss(caching, key.isPrimary(), key.getBlockType()); 673 return null; 674 } 675 RAMQueueEntry re = ramCache.get(key); 676 if (re != null) { 677 if (updateCacheMetrics) { 678 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 679 } 680 re.access(accessCount.incrementAndGet()); 681 return re.getData(); 682 } 683 BucketEntry bucketEntry = backingMap.get(key); 684 if (bucketEntry == null) { 685 bucketEntry = getBlockForReference(key); 686 } 687 if (bucketEntry != null) { 688 long start = System.nanoTime(); 689 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); 690 try { 691 lock.readLock().lock(); 692 // We can not read here even if backingMap does contain the given key because its offset 693 // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check 694 // existence here. 695 if ( 696 bucketEntry.equals(backingMap.get(key)) || bucketEntry.equals(getBlockForReference(key)) 697 ) { 698 // Read the block from IOEngine based on the bucketEntry's offset and length, NOTICE: the 699 // block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to 700 // the same BucketEntry, then all of the three will share the same refCnt. 701 Cacheable cachedBlock = ioEngine.read(bucketEntry); 702 if (ioEngine.usesSharedMemory()) { 703 // If IOEngine use shared memory, cachedBlock and BucketEntry will share the 704 // same RefCnt, do retain here, in order to count the number of RPC references 705 cachedBlock.retain(); 706 } 707 // Update the cache statistics. 708 if (updateCacheMetrics) { 709 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 710 cacheStats.ioHit(System.nanoTime() - start); 711 } 712 bucketEntry.access(accessCount.incrementAndGet()); 713 if (this.ioErrorStartTime > 0) { 714 ioErrorStartTime = -1; 715 } 716 return cachedBlock; 717 } 718 } catch (HBaseIOException hioex) { 719 // When using file io engine persistent cache, 720 // the cache map state might differ from the actual cache. If we reach this block, 721 // we should remove the cache key entry from the backing map 722 backingMap.remove(key); 723 fullyCachedFiles.remove(key.getHfileName()); 724 LOG.debug("Failed to fetch block for cache key: {}.", key, hioex); 725 } catch (IOException ioex) { 726 LOG.error("Failed reading block " + key + " from bucket cache", ioex); 727 checkIOErrorIsTolerated(); 728 } finally { 729 lock.readLock().unlock(); 730 } 731 } 732 if (!repeat && updateCacheMetrics) { 733 cacheStats.miss(caching, key.isPrimary(), key.getBlockType()); 734 } 735 return null; 736 } 737 738 /** 739 * This method is invoked after the bucketEntry is removed from {@link BucketCache#backingMap} 740 */ 741 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber, 742 boolean evictedByEvictionProcess) { 743 bucketEntry.markAsEvicted(); 744 blocksByHFile.remove(cacheKey); 745 if (decrementBlockNumber) { 746 this.blockNumber.decrement(); 747 if (ioEngine.isPersistent()) { 748 fileNotFullyCached(cacheKey.getHfileName()); 749 } 750 } 751 if (evictedByEvictionProcess) { 752 cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); 753 } 754 if (ioEngine.isPersistent()) { 755 setCacheInconsistent(true); 756 } 757 } 758 759 private void fileNotFullyCached(String hfileName) { 760 // Update the regionPrefetchedSizeMap before removing the file from prefetchCompleted 761 if (fullyCachedFiles.containsKey(hfileName)) { 762 Pair<String, Long> regionEntry = fullyCachedFiles.get(hfileName); 763 String regionEncodedName = regionEntry.getFirst(); 764 long filePrefetchSize = regionEntry.getSecond(); 765 LOG.debug("Removing file {} for region {}", hfileName, regionEncodedName); 766 regionCachedSize.computeIfPresent(regionEncodedName, (rn, pf) -> pf - filePrefetchSize); 767 // If all the blocks for a region are evicted from the cache, remove the entry for that region 768 if ( 769 regionCachedSize.containsKey(regionEncodedName) 770 && regionCachedSize.get(regionEncodedName) == 0 771 ) { 772 regionCachedSize.remove(regionEncodedName); 773 } 774 } 775 fullyCachedFiles.remove(hfileName); 776 } 777 778 public void fileCacheCompleted(Path filePath, long size) { 779 Pair<String, Long> pair = new Pair<>(); 780 // sets the region name 781 String regionName = filePath.getParent().getParent().getName(); 782 pair.setFirst(regionName); 783 pair.setSecond(size); 784 fullyCachedFiles.put(filePath.getName(), pair); 785 } 786 787 private void updateRegionCachedSize(Path filePath, long cachedSize) { 788 if (filePath != null) { 789 String regionName = filePath.getParent().getParent().getName(); 790 regionCachedSize.merge(regionName, cachedSize, 791 (previousSize, newBlockSize) -> previousSize + newBlockSize); 792 } 793 } 794 795 /** 796 * Free the {{@link BucketEntry} actually,which could only be invoked when the 797 * {@link BucketEntry#refCnt} becoming 0. 798 */ 799 void freeBucketEntry(BucketEntry bucketEntry) { 800 bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength()); 801 realCacheSize.add(-1 * bucketEntry.getLength()); 802 } 803 804 /** 805 * Try to evict the block from {@link BlockCache} by force. We'll call this in few cases:<br> 806 * 1. Close an HFile, and clear all cached blocks. <br> 807 * 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for a given table.<br> 808 * <p> 809 * Firstly, we'll try to remove the block from RAMCache,and then try to evict from backingMap. 810 * Here we evict the block from backingMap immediately, but only free the reference from bucket 811 * cache by calling {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this 812 * block, block can only be de-allocated when all of them release the block. 813 * <p> 814 * NOTICE: we need to grab the write offset lock firstly before releasing the reference from 815 * bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt = 0 when 816 * {@link BucketCache#getBlock(BlockCacheKey, boolean, boolean, boolean)}, it's a memory leak. 817 * @param cacheKey Block to evict 818 * @return true to indicate whether we've evicted successfully or not. 819 */ 820 @Override 821 public boolean evictBlock(BlockCacheKey cacheKey) { 822 return doEvictBlock(cacheKey, null, false); 823 } 824 825 /** 826 * Evict the {@link BlockCacheKey} and {@link BucketEntry} from {@link BucketCache#backingMap} and 827 * {@link BucketCache#ramCache}. <br/> 828 * NOTE:When Evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and 829 * {@link BucketEntry} could be removed. 830 * @param cacheKey {@link BlockCacheKey} to evict. 831 * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict. 832 * @return true to indicate whether we've evicted successfully or not. 833 */ 834 private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry, 835 boolean evictedByEvictionProcess) { 836 if (!isCacheEnabled()) { 837 return false; 838 } 839 boolean existedInRamCache = removeFromRamCache(cacheKey); 840 if (bucketEntry == null) { 841 bucketEntry = backingMap.get(cacheKey); 842 } 843 final BucketEntry bucketEntryToUse = bucketEntry; 844 845 if (bucketEntryToUse == null) { 846 if (existedInRamCache && evictedByEvictionProcess) { 847 cacheStats.evicted(0, cacheKey.isPrimary()); 848 } 849 return existedInRamCache; 850 } else { 851 return bucketEntryToUse.withWriteLock(offsetLock, () -> { 852 if (backingMap.remove(cacheKey, bucketEntryToUse)) { 853 LOG.debug("removed key {} from back map with offset lock {} in the evict process", 854 cacheKey, bucketEntryToUse.offset()); 855 blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache, evictedByEvictionProcess); 856 return true; 857 } 858 return false; 859 }); 860 } 861 } 862 863 /** 864 * <pre> 865 * Create the {@link Recycler} for {@link BucketEntry#refCnt},which would be used as 866 * {@link RefCnt#recycler} of {@link HFileBlock#buf} returned from {@link BucketCache#getBlock}. 867 * NOTE: for {@link BucketCache#getBlock},the {@link RefCnt#recycler} of {@link HFileBlock#buf} 868 * from {@link BucketCache#backingMap} and {@link BucketCache#ramCache} are different: 869 * 1.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#backingMap}, 870 * it is the return value of current {@link BucketCache#createRecycler} method. 871 * 872 * 2.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#ramCache}, 873 * it is {@link ByteBuffAllocator#putbackBuffer}. 874 * </pre> 875 */ 876 public Recycler createRecycler(final BucketEntry bucketEntry) { 877 return () -> { 878 freeBucketEntry(bucketEntry); 879 return; 880 }; 881 } 882 883 /** 884 * NOTE: This method is only for test. 885 */ 886 public boolean evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey) { 887 BucketEntry bucketEntry = backingMap.get(blockCacheKey); 888 if (bucketEntry == null) { 889 return false; 890 } 891 return evictBucketEntryIfNoRpcReferenced(blockCacheKey, bucketEntry); 892 } 893 894 /** 895 * Evict {@link BlockCacheKey} and its corresponding {@link BucketEntry} only if 896 * {@link BucketEntry#isRpcRef} is false. <br/> 897 * NOTE:When evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and 898 * {@link BucketEntry} could be removed. 899 * @param blockCacheKey {@link BlockCacheKey} to evict. 900 * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict. 901 * @return true to indicate whether we've evicted successfully or not. 902 */ 903 boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, BucketEntry bucketEntry) { 904 if (!bucketEntry.isRpcRef()) { 905 return doEvictBlock(blockCacheKey, bucketEntry, true); 906 } 907 return false; 908 } 909 910 /** 911 * Since HBASE-29249, the following properties governin freeSpace behaviour and block priorities 912 * were made dynamically configurable: - hbase.bucketcache.acceptfactor - 913 * hbase.bucketcache.minfactor - hbase.bucketcache.extrafreefactor - 914 * hbase.bucketcache.single.factor - hbase.bucketcache.multi.factor - 915 * hbase.bucketcache.multi.factor - hbase.bucketcache.memory.factor The 916 * hbase.bucketcache.queue.addition.waittime property allows for introducing a delay in the 917 * publishing of blocks for the cache writer threads during prefetch reads only (client reads 918 * wouldn't get delayed). It has also been made dynamic configurable since HBASE-29249. The 919 * hbase.bucketcache.persist.intervalinmillis propperty determines the frequency for saving the 920 * persistent cache, and it has also been made dynamically configurable since HBASE-29249. The 921 * hbase.bucketcache.persistence.chunksize property determines the size of the persistent file 922 * splits (due to the limitation of maximum allowed protobuff size), and it has also been made 923 * dynamically configurable since HBASE-29249. 924 * @param config the new configuration to be updated. 925 */ 926 @Override 927 public void onConfigurationChange(Configuration config) { 928 this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR); 929 this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR); 930 this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR); 931 this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR); 932 this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR); 933 this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR); 934 this.queueAdditionWaitTime = 935 conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME); 936 this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000); 937 this.persistenceChunkSize = 938 conf.getLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE); 939 sanityCheckConfigs(); 940 } 941 942 protected boolean removeFromRamCache(BlockCacheKey cacheKey) { 943 return ramCache.remove(cacheKey, re -> { 944 if (re != null) { 945 this.blockNumber.decrement(); 946 this.heapSize.add(-1 * re.getData().heapSize()); 947 } 948 }); 949 } 950 951 public boolean isCacheInconsistent() { 952 return isCacheInconsistent.get(); 953 } 954 955 public void setCacheInconsistent(boolean setCacheInconsistent) { 956 isCacheInconsistent.set(setCacheInconsistent); 957 } 958 959 protected void setCacheState(CacheState state) { 960 cacheState = state; 961 } 962 963 /* 964 * Statistics thread. Periodically output cache statistics to the log. 965 */ 966 private static class StatisticsThread extends Thread { 967 private final BucketCache bucketCache; 968 969 public StatisticsThread(BucketCache bucketCache) { 970 super("BucketCacheStatsThread"); 971 setDaemon(true); 972 this.bucketCache = bucketCache; 973 } 974 975 @Override 976 public void run() { 977 bucketCache.logStats(); 978 } 979 } 980 981 public void logStats() { 982 if (!isCacheInitialized("BucketCache::logStats")) { 983 return; 984 } 985 986 long totalSize = bucketAllocator.getTotalSize(); 987 long usedSize = bucketAllocator.getUsedSize(); 988 long freeSize = totalSize - usedSize; 989 long cacheSize = getRealCacheSize(); 990 LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " + "totalSize=" 991 + StringUtils.byteDesc(totalSize) + ", " + "freeSize=" + StringUtils.byteDesc(freeSize) + ", " 992 + "usedSize=" + StringUtils.byteDesc(usedSize) + ", " + "cacheSize=" 993 + StringUtils.byteDesc(cacheSize) + ", " + "accesses=" + cacheStats.getRequestCount() + ", " 994 + "hits=" + cacheStats.getHitCount() + ", " + "IOhitsPerSecond=" 995 + cacheStats.getIOHitsPerSecond() + ", " + "IOTimePerHit=" 996 + String.format("%.2f", cacheStats.getIOTimePerHit()) + ", " + "hitRatio=" 997 + (cacheStats.getHitCount() == 0 998 ? "0," 999 : (StringUtils.formatPercent(cacheStats.getHitRatio(), 2) + ", ")) 1000 + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits=" 1001 + cacheStats.getHitCachingCount() + ", " + "cachingHitsRatio=" 1002 + (cacheStats.getHitCachingCount() == 0 1003 ? "0," 1004 : (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2) + ", ")) 1005 + "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted=" 1006 + cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + cacheStats.evictedPerEviction() 1007 + ", " + "allocationFailCount=" + cacheStats.getAllocationFailCount() + ", blocksCount=" 1008 + backingMap.size()); 1009 cacheStats.reset(); 1010 1011 bucketAllocator.logDebugStatistics(); 1012 } 1013 1014 public long getRealCacheSize() { 1015 return this.realCacheSize.sum(); 1016 } 1017 1018 public long acceptableSize() { 1019 if (!isCacheInitialized("BucketCache::acceptableSize")) { 1020 return 0; 1021 } 1022 return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor); 1023 } 1024 1025 long getPartitionSize(float partitionFactor) { 1026 if (!isCacheInitialized("BucketCache::getPartitionSize")) { 1027 return 0; 1028 } 1029 1030 return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor); 1031 } 1032 1033 /** 1034 * Return the count of bucketSizeinfos still need free space 1035 */ 1036 private int bucketSizesAboveThresholdCount(float minFactor) { 1037 if (!isCacheInitialized("BucketCache::bucketSizesAboveThresholdCount")) { 1038 return 0; 1039 } 1040 1041 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 1042 int fullCount = 0; 1043 for (int i = 0; i < stats.length; i++) { 1044 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 1045 freeGoal = Math.max(freeGoal, 1); 1046 if (stats[i].freeCount() < freeGoal) { 1047 fullCount++; 1048 } 1049 } 1050 return fullCount; 1051 } 1052 1053 /** 1054 * This method will find the buckets that are minimally occupied and are not reference counted and 1055 * will free them completely without any constraint on the access times of the elements, and as a 1056 * process will completely free at most the number of buckets passed, sometimes it might not due 1057 * to changing refCounts 1058 * @param completelyFreeBucketsNeeded number of buckets to free 1059 **/ 1060 private void freeEntireBuckets(int completelyFreeBucketsNeeded) { 1061 if (!isCacheInitialized("BucketCache::freeEntireBuckets")) { 1062 return; 1063 } 1064 1065 if (completelyFreeBucketsNeeded != 0) { 1066 // First we will build a set where the offsets are reference counted, usually 1067 // this set is small around O(Handler Count) unless something else is wrong 1068 Set<Integer> inUseBuckets = new HashSet<>(); 1069 backingMap.forEach((k, be) -> { 1070 if (be.isRpcRef()) { 1071 inUseBuckets.add(bucketAllocator.getBucketIndex(be.offset())); 1072 } 1073 }); 1074 Set<Integer> candidateBuckets = 1075 bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded); 1076 for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) { 1077 if (candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset()))) { 1078 evictBucketEntryIfNoRpcReferenced(entry.getKey(), entry.getValue()); 1079 } 1080 } 1081 } 1082 } 1083 1084 private long calculateBytesToFree(StringBuilder msgBuffer) { 1085 long bytesToFreeWithoutExtra = 0; 1086 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 1087 long[] bytesToFreeForBucket = new long[stats.length]; 1088 for (int i = 0; i < stats.length; i++) { 1089 bytesToFreeForBucket[i] = 0; 1090 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 1091 freeGoal = Math.max(freeGoal, 1); 1092 if (stats[i].freeCount() < freeGoal) { 1093 bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount()); 1094 bytesToFreeWithoutExtra += bytesToFreeForBucket[i]; 1095 if (msgBuffer != null) { 1096 msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" 1097 + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", "); 1098 } 1099 } 1100 } 1101 if (msgBuffer != null) { 1102 msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); 1103 } 1104 return bytesToFreeWithoutExtra; 1105 } 1106 1107 /** 1108 * Free the space if the used size reaches acceptableSize() or one size block couldn't be 1109 * allocated. When freeing the space, we use the LRU algorithm and ensure there must be some 1110 * blocks evicted 1111 * @param why Why we are being called 1112 */ 1113 void freeSpace(final String why) { 1114 if (!isCacheInitialized("BucketCache::freeSpace")) { 1115 return; 1116 } 1117 // Ensure only one freeSpace progress at a time 1118 if (!freeSpaceLock.tryLock()) { 1119 return; 1120 } 1121 try { 1122 freeInProgress = true; 1123 StringBuilder msgBuffer = LOG.isDebugEnabled() ? new StringBuilder() : null; 1124 long bytesToFreeWithoutExtra = calculateBytesToFree(msgBuffer); 1125 if (bytesToFreeWithoutExtra <= 0) { 1126 return; 1127 } 1128 long currentSize = bucketAllocator.getUsedSize(); 1129 long totalSize = bucketAllocator.getTotalSize(); 1130 if (LOG.isDebugEnabled() && msgBuffer != null) { 1131 LOG.debug("Free started because \"" + why + "\"; " + msgBuffer + " of current used=" 1132 + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" 1133 + StringUtils.byteDesc(realCacheSize.sum()) + ", total=" 1134 + StringUtils.byteDesc(totalSize)); 1135 } 1136 long bytesToFreeWithExtra = 1137 (long) Math.floor(bytesToFreeWithoutExtra * (1 + extraFreeFactor)); 1138 // Instantiate priority buckets 1139 BucketEntryGroup bucketSingle = 1140 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(singleFactor)); 1141 BucketEntryGroup bucketMulti = 1142 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(multiFactor)); 1143 BucketEntryGroup bucketMemory = 1144 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(memoryFactor)); 1145 1146 Set<String> allValidFiles = null; 1147 // We need the region/stores/files tree, in order to figure out if a block is "orphan" or not. 1148 // See further comments below for more details. 1149 if (onlineRegions != null) { 1150 allValidFiles = BlockCacheUtil.listAllFilesNames(onlineRegions); 1151 } 1152 // the cached time is recored in nanos, so we need to convert the grace period accordingly 1153 long orphanGracePeriodNanos = orphanBlockGracePeriod * 1000000; 1154 long bytesFreed = 0; 1155 // Scan entire map putting bucket entry into appropriate bucket entry 1156 // group 1157 for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) { 1158 BlockCacheKey key = bucketEntryWithKey.getKey(); 1159 BucketEntry entry = bucketEntryWithKey.getValue(); 1160 // Under certain conditions, blocks for regions not on the current region server might 1161 // be hanging on the cache. For example, when using the persistent cache feature, if the 1162 // RS crashes, then if not the same regions are assigned back once its online again, blocks 1163 // for the previous online regions would be recovered and stay in the cache. These would be 1164 // "orphan" blocks, as the files these blocks belong to are not in any of the online 1165 // regions. 1166 // "Orphan" blocks are a pure waste of cache space and should be evicted first during 1167 // the freespace run. 1168 // Compactions and Flushes may cache blocks before its files are completely written. In 1169 // these cases the file won't be found in any of the online regions stores, but the block 1170 // shouldn't be evicted. To avoid this, we defined this 1171 // hbase.bucketcache.block.orphan.evictgraceperiod property, to account for a grace 1172 // period (default 24 hours) where a block should be checked if it's an orphan block. 1173 if ( 1174 allValidFiles != null 1175 && entry.getCachedTime() < (System.nanoTime() - orphanGracePeriodNanos) 1176 ) { 1177 if (!allValidFiles.contains(key.getHfileName())) { 1178 if (evictBucketEntryIfNoRpcReferenced(key, entry)) { 1179 // We calculate the freed bytes, but we don't stop if the goal was reached because 1180 // these are orphan blocks anyway, so let's leverage this run of freeSpace 1181 // to get rid of all orphans at once. 1182 bytesFreed += entry.getLength(); 1183 continue; 1184 } 1185 } 1186 } 1187 switch (entry.getPriority()) { 1188 case SINGLE: { 1189 bucketSingle.add(bucketEntryWithKey); 1190 break; 1191 } 1192 case MULTI: { 1193 bucketMulti.add(bucketEntryWithKey); 1194 break; 1195 } 1196 case MEMORY: { 1197 bucketMemory.add(bucketEntryWithKey); 1198 break; 1199 } 1200 } 1201 } 1202 PriorityQueue<BucketEntryGroup> bucketQueue = 1203 new PriorityQueue<>(3, Comparator.comparingLong(BucketEntryGroup::overflow)); 1204 1205 bucketQueue.add(bucketSingle); 1206 bucketQueue.add(bucketMulti); 1207 bucketQueue.add(bucketMemory); 1208 1209 int remainingBuckets = bucketQueue.size(); 1210 1211 BucketEntryGroup bucketGroup; 1212 while ((bucketGroup = bucketQueue.poll()) != null) { 1213 long overflow = bucketGroup.overflow(); 1214 if (overflow > 0) { 1215 long bucketBytesToFree = 1216 Math.min(overflow, (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets); 1217 bytesFreed += bucketGroup.free(bucketBytesToFree); 1218 } 1219 remainingBuckets--; 1220 } 1221 1222 // Check and free if there are buckets that still need freeing of space 1223 if (bucketSizesAboveThresholdCount(minFactor) > 0) { 1224 bucketQueue.clear(); 1225 remainingBuckets = 3; 1226 bucketQueue.add(bucketSingle); 1227 bucketQueue.add(bucketMulti); 1228 bucketQueue.add(bucketMemory); 1229 while ((bucketGroup = bucketQueue.poll()) != null) { 1230 long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets; 1231 bytesFreed += bucketGroup.free(bucketBytesToFree); 1232 remainingBuckets--; 1233 } 1234 } 1235 // Even after the above free we might still need freeing because of the 1236 // De-fragmentation of the buckets (also called Slab Calcification problem), i.e 1237 // there might be some buckets where the occupancy is very sparse and thus are not 1238 // yielding the free for the other bucket sizes, the fix for this to evict some 1239 // of the buckets, we do this by evicting the buckets that are least fulled 1240 freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR * bucketSizesAboveThresholdCount(1.0f)); 1241 1242 if (LOG.isDebugEnabled()) { 1243 long single = bucketSingle.totalSize(); 1244 long multi = bucketMulti.totalSize(); 1245 long memory = bucketMemory.totalSize(); 1246 if (LOG.isDebugEnabled()) { 1247 LOG.debug("Bucket cache free space completed; " + "freed=" 1248 + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + StringUtils.byteDesc(totalSize) 1249 + ", " + "single=" + StringUtils.byteDesc(single) + ", " + "multi=" 1250 + StringUtils.byteDesc(multi) + ", " + "memory=" + StringUtils.byteDesc(memory)); 1251 } 1252 } 1253 } catch (Throwable t) { 1254 LOG.warn("Failed freeing space", t); 1255 } finally { 1256 cacheStats.evict(); 1257 freeInProgress = false; 1258 freeSpaceLock.unlock(); 1259 } 1260 } 1261 1262 // This handles flushing the RAM cache to IOEngine. 1263 class WriterThread extends Thread { 1264 private final BlockingQueue<RAMQueueEntry> inputQueue; 1265 private volatile boolean writerEnabled = true; 1266 private final ByteBuffer metaBuff = ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE); 1267 1268 WriterThread(BlockingQueue<RAMQueueEntry> queue) { 1269 super("BucketCacheWriterThread"); 1270 this.inputQueue = queue; 1271 } 1272 1273 // Used for test 1274 void disableWriter() { 1275 this.writerEnabled = false; 1276 } 1277 1278 @Override 1279 public void run() { 1280 List<RAMQueueEntry> entries = new ArrayList<>(); 1281 try { 1282 while (isCacheEnabled() && writerEnabled) { 1283 try { 1284 try { 1285 // Blocks 1286 entries = getRAMQueueEntries(inputQueue, entries); 1287 } catch (InterruptedException ie) { 1288 if (!isCacheEnabled() || !writerEnabled) { 1289 break; 1290 } 1291 } 1292 doDrain(entries, metaBuff); 1293 } catch (Exception ioe) { 1294 LOG.error("WriterThread encountered error", ioe); 1295 } 1296 } 1297 } catch (Throwable t) { 1298 LOG.warn("Failed doing drain", t); 1299 } 1300 LOG.info(this.getName() + " exiting, cacheEnabled=" + isCacheEnabled()); 1301 } 1302 } 1303 1304 /** 1305 * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing 1306 * cache with a new block for the same cache key. there's a corner case: one thread cache a block 1307 * in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another new block 1308 * with the same cache key do the same thing for the same cache key, so if not evict the previous 1309 * bucket entry, then memory leak happen because the previous bucketEntry is gone but the 1310 * bucketAllocator do not free its memory. 1311 * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey 1312 * cacheKey, Cacheable newBlock) 1313 * @param key Block cache key 1314 * @param bucketEntry Bucket entry to put into backingMap. 1315 */ 1316 protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { 1317 BucketEntry previousEntry = backingMap.put(key, bucketEntry); 1318 blocksByHFile.add(key); 1319 updateRegionCachedSize(key.getFilePath(), bucketEntry.getLength()); 1320 if (previousEntry != null && previousEntry != bucketEntry) { 1321 previousEntry.withWriteLock(offsetLock, () -> { 1322 blockEvicted(key, previousEntry, false, false); 1323 return null; 1324 }); 1325 } 1326 } 1327 1328 /** 1329 * Prepare and return a warning message for Bucket Allocator Exception 1330 * @param fle The exception 1331 * @param re The RAMQueueEntry for which the exception was thrown. 1332 * @return A warning message created from the input RAMQueueEntry object. 1333 */ 1334 private static String getAllocationFailWarningMessage(final BucketAllocatorException fle, 1335 final RAMQueueEntry re) { 1336 final StringBuilder sb = new StringBuilder(); 1337 sb.append("Most recent failed allocation after "); 1338 sb.append(ALLOCATION_FAIL_LOG_TIME_PERIOD); 1339 sb.append(" ms;"); 1340 if (re != null) { 1341 if (re.getData() instanceof HFileBlock) { 1342 final HFileContext fileContext = ((HFileBlock) re.getData()).getHFileContext(); 1343 final String columnFamily = Bytes.toString(fileContext.getColumnFamily()); 1344 final String tableName = Bytes.toString(fileContext.getTableName()); 1345 if (tableName != null) { 1346 sb.append(" Table: "); 1347 sb.append(tableName); 1348 } 1349 if (columnFamily != null) { 1350 sb.append(" CF: "); 1351 sb.append(columnFamily); 1352 } 1353 sb.append(" HFile: "); 1354 if (fileContext.getHFileName() != null) { 1355 sb.append(fileContext.getHFileName()); 1356 } else { 1357 sb.append(re.getKey()); 1358 } 1359 } else { 1360 sb.append(" HFile: "); 1361 sb.append(re.getKey()); 1362 } 1363 } 1364 sb.append(" Message: "); 1365 sb.append(fle.getMessage()); 1366 return sb.toString(); 1367 } 1368 1369 /** 1370 * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. Process all that 1371 * are passed in even if failure being sure to remove from ramCache else we'll never undo the 1372 * references and we'll OOME. 1373 * @param entries Presumes list passed in here will be processed by this invocation only. No 1374 * interference expected. 1375 */ 1376 void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException { 1377 if (entries.isEmpty()) { 1378 return; 1379 } 1380 // This method is a little hard to follow. We run through the passed in entries and for each 1381 // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must 1382 // do cleanup making sure we've cleared ramCache of all entries regardless of whether we 1383 // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by 1384 // filling ramCache. We do the clean up by again running through the passed in entries 1385 // doing extra work when we find a non-null bucketEntries corresponding entry. 1386 final int size = entries.size(); 1387 BucketEntry[] bucketEntries = new BucketEntry[size]; 1388 // Index updated inside loop if success or if we can't succeed. We retry if cache is full 1389 // when we go to add an entry by going around the loop again without upping the index. 1390 int index = 0; 1391 while (isCacheEnabled() && index < size) { 1392 RAMQueueEntry re = null; 1393 try { 1394 re = entries.get(index); 1395 if (re == null) { 1396 LOG.warn("Couldn't get entry or changed on us; who else is messing with it?"); 1397 index++; 1398 continue; 1399 } 1400 // Reset the position for reuse. 1401 // It should be guaranteed that the data in the metaBuff has been transferred to the 1402 // ioEngine safely. Otherwise, this reuse is problematic. Fortunately, the data is already 1403 // transferred with our current IOEngines. Should take care, when we have new kinds of 1404 // IOEngine in the future. 1405 metaBuff.clear(); 1406 BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize, 1407 this::createRecycler, metaBuff, acceptableSize()); 1408 // Successfully added. Up index and add bucketEntry. Clear io exceptions. 1409 bucketEntries[index] = bucketEntry; 1410 if (ioErrorStartTime > 0) { 1411 ioErrorStartTime = -1; 1412 } 1413 index++; 1414 } catch (BucketAllocatorException fle) { 1415 long currTs = EnvironmentEdgeManager.currentTime(); 1416 cacheStats.allocationFailed(); // Record the warning. 1417 if ( 1418 allocFailLogPrevTs == 0 || (currTs - allocFailLogPrevTs) > ALLOCATION_FAIL_LOG_TIME_PERIOD 1419 ) { 1420 LOG.warn(getAllocationFailWarningMessage(fle, re)); 1421 allocFailLogPrevTs = currTs; 1422 } 1423 // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below. 1424 bucketEntries[index] = null; 1425 index++; 1426 } catch (CacheFullException cfe) { 1427 // Cache full when we tried to add. Try freeing space and then retrying (don't up index) 1428 if (!freeInProgress && !re.isPrefetch()) { 1429 freeSpace("Full!"); 1430 } else if (re.isPrefetch()) { 1431 bucketEntries[index] = null; 1432 index++; 1433 } else { 1434 Thread.sleep(50); 1435 } 1436 } catch (IOException ioex) { 1437 // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem. 1438 LOG.error("Failed writing to bucket cache", ioex); 1439 checkIOErrorIsTolerated(); 1440 } 1441 } 1442 1443 // Make sure data pages are written on media before we update maps. 1444 try { 1445 ioEngine.sync(); 1446 } catch (IOException ioex) { 1447 LOG.error("Failed syncing IO engine", ioex); 1448 checkIOErrorIsTolerated(); 1449 // Since we failed sync, free the blocks in bucket allocator 1450 for (int i = 0; i < entries.size(); ++i) { 1451 BucketEntry bucketEntry = bucketEntries[i]; 1452 if (bucketEntry != null) { 1453 bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength()); 1454 bucketEntries[i] = null; 1455 } 1456 } 1457 } 1458 1459 // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if 1460 // success or error. 1461 for (int i = 0; i < size; ++i) { 1462 BlockCacheKey key = entries.get(i).getKey(); 1463 // Only add if non-null entry. 1464 if (bucketEntries[i] != null) { 1465 putIntoBackingMap(key, bucketEntries[i]); 1466 if (ioEngine.isPersistent()) { 1467 setCacheInconsistent(true); 1468 } 1469 } 1470 // Always remove from ramCache even if we failed adding it to the block cache above. 1471 boolean existed = ramCache.remove(key, re -> { 1472 if (re != null) { 1473 heapSize.add(-1 * re.getData().heapSize()); 1474 } 1475 }); 1476 if (!existed && bucketEntries[i] != null) { 1477 // Block should have already been evicted. Remove it and free space. 1478 final BucketEntry bucketEntry = bucketEntries[i]; 1479 bucketEntry.withWriteLock(offsetLock, () -> { 1480 if (backingMap.remove(key, bucketEntry)) { 1481 blockEvicted(key, bucketEntry, false, false); 1482 } 1483 return null; 1484 }); 1485 } 1486 long used = bucketAllocator.getUsedSize(); 1487 if (!entries.get(i).isPrefetch() && used > acceptableSize()) { 1488 LOG.debug("Calling freeSpace for block: {}", entries.get(i).getKey()); 1489 freeSpace("Used=" + used + " > acceptable=" + acceptableSize()); 1490 } 1491 } 1492 1493 } 1494 1495 /** 1496 * Blocks until elements available in {@code q} then tries to grab as many as possible before 1497 * returning. 1498 * @param receptacle Where to stash the elements taken from queue. We clear before we use it just 1499 * in case. 1500 * @param q The queue to take from. 1501 * @return {@code receptacle} laden with elements taken from the queue or empty if none found. 1502 */ 1503 static List<RAMQueueEntry> getRAMQueueEntries(BlockingQueue<RAMQueueEntry> q, 1504 List<RAMQueueEntry> receptacle) throws InterruptedException { 1505 // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it 1506 // ok even if list grew to accommodate thousands. 1507 receptacle.clear(); 1508 receptacle.add(q.take()); 1509 q.drainTo(receptacle); 1510 return receptacle; 1511 } 1512 1513 /** 1514 * @see #retrieveFromFile(int[]) 1515 */ 1516 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION", 1517 justification = "false positive, try-with-resources ensures close is called.") 1518 void persistToFile() throws IOException { 1519 LOG.debug("Thread {} started persisting bucket cache to file", 1520 Thread.currentThread().getName()); 1521 if (!isCachePersistent()) { 1522 throw new IOException("Attempt to persist non-persistent cache mappings!"); 1523 } 1524 File tempPersistencePath = new File(persistencePath + EnvironmentEdgeManager.currentTime()); 1525 try (FileOutputStream fos = new FileOutputStream(tempPersistencePath, false)) { 1526 LOG.debug("Persist in new chunked persistence format."); 1527 1528 persistChunkedBackingMap(fos); 1529 1530 LOG.debug( 1531 "PersistToFile: after persisting backing map size: {}, fullycachedFiles size: {}," 1532 + " file name: {}", 1533 backingMap.size(), fullyCachedFiles.size(), tempPersistencePath.getName()); 1534 } catch (IOException e) { 1535 LOG.error("Failed to persist bucket cache to file", e); 1536 throw e; 1537 } catch (Throwable e) { 1538 LOG.error("Failed during persist bucket cache to file: ", e); 1539 throw e; 1540 } 1541 LOG.debug("Thread {} finished persisting bucket cache to file, renaming", 1542 Thread.currentThread().getName()); 1543 if (!tempPersistencePath.renameTo(new File(persistencePath))) { 1544 LOG.warn("Failed to commit cache persistent file. We might lose cached blocks if " 1545 + "RS crashes/restarts before we successfully checkpoint again."); 1546 } 1547 } 1548 1549 public boolean isCachePersistent() { 1550 return ioEngine.isPersistent() && persistencePath != null; 1551 } 1552 1553 @Override 1554 public Optional<Map<String, Long>> getRegionCachedInfo() { 1555 return Optional.of(Collections.unmodifiableMap(regionCachedSize)); 1556 } 1557 1558 /** 1559 * @see #persistToFile() 1560 */ 1561 private void retrieveFromFile(int[] bucketSizes) throws IOException { 1562 LOG.info("Started retrieving bucket cache from file"); 1563 File persistenceFile = new File(persistencePath); 1564 if (!persistenceFile.exists()) { 1565 LOG.warn("Persistence file missing! " 1566 + "It's ok if it's first run after enabling persistent cache."); 1567 bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize); 1568 blockNumber.add(backingMap.size()); 1569 backingMapValidated.set(true); 1570 return; 1571 } 1572 assert !isCacheEnabled(); 1573 1574 try (FileInputStream in = new FileInputStream(persistenceFile)) { 1575 int pblen = ProtobufMagic.lengthOfPBMagic(); 1576 byte[] pbuf = new byte[pblen]; 1577 IOUtils.readFully(in, pbuf, 0, pblen); 1578 if (ProtobufMagic.isPBMagicPrefix(pbuf)) { 1579 LOG.info("Reading old format of persistence."); 1580 // The old non-chunked version of backing map persistence. 1581 parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in)); 1582 } else if (Arrays.equals(pbuf, BucketProtoUtils.PB_MAGIC_V2)) { 1583 // The new persistence format of chunked persistence. 1584 LOG.info("Reading new chunked format of persistence."); 1585 retrieveChunkedBackingMap(in); 1586 } else { 1587 // In 3.0 we have enough flexibility to dump the old cache data. 1588 // TODO: In 2.x line, this might need to be filled in to support reading the old format 1589 throw new IOException( 1590 "Persistence file does not start with protobuf magic number. " + persistencePath); 1591 } 1592 bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize); 1593 blockNumber.add(backingMap.size()); 1594 LOG.info("Bucket cache retrieved from file successfully with size: {}", backingMap.size()); 1595 } 1596 } 1597 1598 private void updateRegionSizeMapWhileRetrievingFromFile() { 1599 // Update the regionCachedSize with the region size while restarting the region server 1600 if (LOG.isDebugEnabled()) { 1601 LOG.debug("Updating region size map after retrieving cached file list"); 1602 dumpPrefetchList(); 1603 } 1604 regionCachedSize.clear(); 1605 fullyCachedFiles.forEach((hFileName, hFileSize) -> { 1606 // Get the region name for each file 1607 String regionEncodedName = hFileSize.getFirst(); 1608 long cachedFileSize = hFileSize.getSecond(); 1609 regionCachedSize.merge(regionEncodedName, cachedFileSize, 1610 (oldpf, fileSize) -> oldpf + fileSize); 1611 }); 1612 } 1613 1614 private void dumpPrefetchList() { 1615 for (Map.Entry<String, Pair<String, Long>> outerEntry : fullyCachedFiles.entrySet()) { 1616 LOG.debug("Cached File Entry:<{},<{},{}>>", outerEntry.getKey(), 1617 outerEntry.getValue().getFirst(), outerEntry.getValue().getSecond()); 1618 } 1619 } 1620 1621 private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass) 1622 throws IOException { 1623 if (capacitySize != cacheCapacity) { 1624 throw new IOException("Mismatched cache capacity:" + StringUtils.byteDesc(capacitySize) 1625 + ", expected: " + StringUtils.byteDesc(cacheCapacity)); 1626 } 1627 if (!ioEngine.getClass().getName().equals(ioclass)) { 1628 throw new IOException("Class name for IO engine mismatch: " + ioclass + ", expected:" 1629 + ioEngine.getClass().getName()); 1630 } 1631 if (!backingMap.getClass().getName().equals(mapclass)) { 1632 throw new IOException("Class name for cache map mismatch: " + mapclass + ", expected:" 1633 + backingMap.getClass().getName()); 1634 } 1635 } 1636 1637 private void verifyFileIntegrity(BucketCacheProtos.BucketCacheEntry proto) { 1638 try { 1639 if (proto.hasChecksum()) { 1640 ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(), 1641 algorithm); 1642 } 1643 backingMapValidated.set(true); 1644 } catch (IOException e) { 1645 LOG.warn("Checksum for cache file failed. " 1646 + "We need to validate each cache key in the backing map. " 1647 + "This may take some time, so we'll do it in a background thread,"); 1648 1649 Runnable cacheValidator = () -> { 1650 while (bucketAllocator == null) { 1651 try { 1652 Thread.sleep(50); 1653 } catch (InterruptedException ex) { 1654 throw new RuntimeException(ex); 1655 } 1656 } 1657 long startTime = EnvironmentEdgeManager.currentTime(); 1658 int totalKeysOriginally = backingMap.size(); 1659 for (Map.Entry<BlockCacheKey, BucketEntry> keyEntry : backingMap.entrySet()) { 1660 try { 1661 ((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue()); 1662 } catch (IOException e1) { 1663 LOG.debug("Check for key {} failed. Evicting.", keyEntry.getKey()); 1664 evictBlock(keyEntry.getKey()); 1665 fileNotFullyCached(keyEntry.getKey().getHfileName()); 1666 } 1667 } 1668 backingMapValidated.set(true); 1669 LOG.info("Finished validating {} keys in the backing map. Recovered: {}. This took {}ms.", 1670 totalKeysOriginally, backingMap.size(), 1671 (EnvironmentEdgeManager.currentTime() - startTime)); 1672 }; 1673 Thread t = new Thread(cacheValidator); 1674 t.setDaemon(true); 1675 t.start(); 1676 } 1677 } 1678 1679 private void updateCacheIndex(BucketCacheProtos.BackingMap chunk, 1680 java.util.Map<java.lang.Integer, java.lang.String> deserializer) throws IOException { 1681 Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair2 = 1682 BucketProtoUtils.fromPB(deserializer, chunk, this::createRecycler); 1683 backingMap.putAll(pair2.getFirst()); 1684 blocksByHFile.addAll(pair2.getSecond()); 1685 } 1686 1687 private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException { 1688 Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair = 1689 BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(), 1690 this::createRecycler); 1691 backingMap = pair.getFirst(); 1692 blocksByHFile = pair.getSecond(); 1693 fullyCachedFiles.clear(); 1694 fullyCachedFiles.putAll(BucketProtoUtils.fromPB(proto.getCachedFilesMap())); 1695 1696 LOG.info("After retrieval Backing map size: {}, fullyCachedFiles size: {}", backingMap.size(), 1697 fullyCachedFiles.size()); 1698 1699 verifyFileIntegrity(proto); 1700 updateRegionSizeMapWhileRetrievingFromFile(); 1701 verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass()); 1702 } 1703 1704 private void persistChunkedBackingMap(FileOutputStream fos) throws IOException { 1705 LOG.debug( 1706 "persistToFile: before persisting backing map size: {}, " 1707 + "fullycachedFiles size: {}, chunkSize: {}", 1708 backingMap.size(), fullyCachedFiles.size(), persistenceChunkSize); 1709 1710 BucketProtoUtils.serializeAsPB(this, fos, persistenceChunkSize); 1711 1712 LOG.debug( 1713 "persistToFile: after persisting backing map size: {}, " + "fullycachedFiles size: {}", 1714 backingMap.size(), fullyCachedFiles.size()); 1715 } 1716 1717 private void retrieveChunkedBackingMap(FileInputStream in) throws IOException { 1718 1719 // Read the first chunk that has all the details. 1720 BucketCacheProtos.BucketCacheEntry cacheEntry = 1721 BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in); 1722 1723 fullyCachedFiles.clear(); 1724 fullyCachedFiles.putAll(BucketProtoUtils.fromPB(cacheEntry.getCachedFilesMap())); 1725 1726 backingMap.clear(); 1727 blocksByHFile.clear(); 1728 1729 // Read the backing map entries in batches. 1730 int numChunks = 0; 1731 while (in.available() > 0) { 1732 updateCacheIndex(BucketCacheProtos.BackingMap.parseDelimitedFrom(in), 1733 cacheEntry.getDeserializersMap()); 1734 numChunks++; 1735 } 1736 1737 LOG.info("Retrieved {} of chunks with blockCount = {}.", numChunks, backingMap.size()); 1738 verifyFileIntegrity(cacheEntry); 1739 verifyCapacityAndClasses(cacheEntry.getCacheCapacity(), cacheEntry.getIoClass(), 1740 cacheEntry.getMapClass()); 1741 updateRegionSizeMapWhileRetrievingFromFile(); 1742 } 1743 1744 /** 1745 * Check whether we tolerate IO error this time. If the duration of IOEngine throwing errors 1746 * exceeds ioErrorsDurationTimeTolerated, we will disable the cache 1747 */ 1748 private void checkIOErrorIsTolerated() { 1749 long now = EnvironmentEdgeManager.currentTime(); 1750 // Do a single read to a local variable to avoid timing issue - HBASE-24454 1751 long ioErrorStartTimeTmp = this.ioErrorStartTime; 1752 if (ioErrorStartTimeTmp > 0) { 1753 if (isCacheEnabled() && (now - ioErrorStartTimeTmp) > this.ioErrorsTolerationDuration) { 1754 LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration 1755 + "ms, disabling cache, please check your IOEngine"); 1756 disableCache(); 1757 } 1758 } else { 1759 this.ioErrorStartTime = now; 1760 } 1761 } 1762 1763 /** 1764 * Used to shut down the cache -or- turn it off in the case of something broken. 1765 */ 1766 private void disableCache() { 1767 if (!isCacheEnabled()) { 1768 return; 1769 } 1770 LOG.info("Disabling cache"); 1771 cacheState = CacheState.DISABLED; 1772 ioEngine.shutdown(); 1773 this.scheduleThreadPool.shutdown(); 1774 for (int i = 0; i < writerThreads.length; ++i) 1775 writerThreads[i].interrupt(); 1776 this.ramCache.clear(); 1777 if (!ioEngine.isPersistent() || persistencePath == null) { 1778 // If persistent ioengine and a path, we will serialize out the backingMap. 1779 this.backingMap.clear(); 1780 this.blocksByHFile.clear(); 1781 this.fullyCachedFiles.clear(); 1782 this.regionCachedSize.clear(); 1783 } 1784 if (cacheStats.getMetricsRollerScheduler() != null) { 1785 cacheStats.getMetricsRollerScheduler().shutdownNow(); 1786 } 1787 } 1788 1789 private void join() throws InterruptedException { 1790 for (int i = 0; i < writerThreads.length; ++i) 1791 writerThreads[i].join(); 1792 } 1793 1794 @Override 1795 public void shutdown() { 1796 disableCache(); 1797 LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() + "; path to write=" 1798 + persistencePath); 1799 if (ioEngine.isPersistent() && persistencePath != null) { 1800 try { 1801 join(); 1802 if (cachePersister != null) { 1803 LOG.info("Shutting down cache persister thread."); 1804 cachePersister.shutdown(); 1805 while (cachePersister.isAlive()) { 1806 Thread.sleep(10); 1807 } 1808 } 1809 persistToFile(); 1810 } catch (IOException ex) { 1811 LOG.error("Unable to persist data on exit: " + ex.toString(), ex); 1812 } catch (InterruptedException e) { 1813 LOG.warn("Failed to persist data on exit", e); 1814 } 1815 } 1816 } 1817 1818 /** 1819 * Needed mostly for UTs that might run in the same VM and create different BucketCache instances 1820 * on different UT methods. 1821 */ 1822 @Override 1823 protected void finalize() { 1824 if (cachePersister != null && !cachePersister.isInterrupted()) { 1825 cachePersister.interrupt(); 1826 } 1827 } 1828 1829 @Override 1830 public CacheStats getStats() { 1831 return cacheStats; 1832 } 1833 1834 public BucketAllocator getAllocator() { 1835 return this.bucketAllocator; 1836 } 1837 1838 @Override 1839 public long heapSize() { 1840 return this.heapSize.sum(); 1841 } 1842 1843 @Override 1844 public long size() { 1845 return this.realCacheSize.sum(); 1846 } 1847 1848 @Override 1849 public long getCurrentDataSize() { 1850 return size(); 1851 } 1852 1853 @Override 1854 public long getFreeSize() { 1855 if (!isCacheInitialized("BucketCache:getFreeSize")) { 1856 return 0; 1857 } 1858 return this.bucketAllocator.getFreeSize(); 1859 } 1860 1861 @Override 1862 public long getBlockCount() { 1863 return this.blockNumber.sum(); 1864 } 1865 1866 @Override 1867 public long getDataBlockCount() { 1868 return getBlockCount(); 1869 } 1870 1871 @Override 1872 public long getCurrentSize() { 1873 if (!isCacheInitialized("BucketCache::getCurrentSize")) { 1874 return 0; 1875 } 1876 return this.bucketAllocator.getUsedSize(); 1877 } 1878 1879 protected String getAlgorithm() { 1880 return algorithm; 1881 } 1882 1883 /** 1884 * Evicts all blocks for a specific HFile. 1885 * <p> 1886 * This is used for evict-on-close to remove all blocks of a specific HFile. 1887 * @return the number of blocks evicted 1888 */ 1889 @Override 1890 public int evictBlocksByHfileName(String hfileName) { 1891 return evictBlocksRangeByHfileName(hfileName, 0, Long.MAX_VALUE); 1892 } 1893 1894 @Override 1895 public int evictBlocksRangeByHfileName(String hfileName, long initOffset, long endOffset) { 1896 fileNotFullyCached(hfileName); 1897 Set<BlockCacheKey> keySet = getAllCacheKeysForFile(hfileName, initOffset, endOffset); 1898 LOG.debug("found {} blocks for file {}, starting offset: {}, end offset: {}", keySet.size(), 1899 hfileName, initOffset, endOffset); 1900 int numEvicted = 0; 1901 for (BlockCacheKey key : keySet) { 1902 if (evictBlock(key)) { 1903 ++numEvicted; 1904 } 1905 } 1906 return numEvicted; 1907 } 1908 1909 private Set<BlockCacheKey> getAllCacheKeysForFile(String hfileName, long init, long end) { 1910 return blocksByHFile.subSet(new BlockCacheKey(hfileName, init), true, 1911 new BlockCacheKey(hfileName, end), true); 1912 } 1913 1914 /** 1915 * Used to group bucket entries into priority buckets. There will be a BucketEntryGroup for each 1916 * priority (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate 1917 * number of elements out of each according to configuration parameters and their relative sizes. 1918 */ 1919 private class BucketEntryGroup { 1920 1921 private CachedEntryQueue queue; 1922 private long totalSize = 0; 1923 private long bucketSize; 1924 1925 public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) { 1926 this.bucketSize = bucketSize; 1927 queue = new CachedEntryQueue(bytesToFree, blockSize); 1928 totalSize = 0; 1929 } 1930 1931 public void add(Map.Entry<BlockCacheKey, BucketEntry> block) { 1932 totalSize += block.getValue().getLength(); 1933 queue.add(block); 1934 } 1935 1936 public long free(long toFree) { 1937 Map.Entry<BlockCacheKey, BucketEntry> entry; 1938 long freedBytes = 0; 1939 // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free 1940 // What to do then? Caching attempt fail? Need some changes in cacheBlock API? 1941 while ((entry = queue.pollLast()) != null) { 1942 BlockCacheKey blockCacheKey = entry.getKey(); 1943 BucketEntry be = entry.getValue(); 1944 if (evictBucketEntryIfNoRpcReferenced(blockCacheKey, be)) { 1945 freedBytes += be.getLength(); 1946 } 1947 if (freedBytes >= toFree) { 1948 return freedBytes; 1949 } 1950 } 1951 return freedBytes; 1952 } 1953 1954 public long overflow() { 1955 return totalSize - bucketSize; 1956 } 1957 1958 public long totalSize() { 1959 return totalSize; 1960 } 1961 } 1962 1963 /** 1964 * Block Entry stored in the memory with key,data and so on 1965 */ 1966 static class RAMQueueEntry { 1967 private final BlockCacheKey key; 1968 private final Cacheable data; 1969 private long accessCounter; 1970 private boolean inMemory; 1971 private boolean isCachePersistent; 1972 1973 private boolean isPrefetch; 1974 1975 RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory, 1976 boolean isCachePersistent, boolean isPrefetch) { 1977 this.key = bck; 1978 this.data = data; 1979 this.accessCounter = accessCounter; 1980 this.inMemory = inMemory; 1981 this.isCachePersistent = isCachePersistent; 1982 this.isPrefetch = isPrefetch; 1983 } 1984 1985 public Cacheable getData() { 1986 return data; 1987 } 1988 1989 public BlockCacheKey getKey() { 1990 return key; 1991 } 1992 1993 public boolean isPrefetch() { 1994 return isPrefetch; 1995 } 1996 1997 public void access(long accessCounter) { 1998 this.accessCounter = accessCounter; 1999 } 2000 2001 private ByteBuffAllocator getByteBuffAllocator() { 2002 if (data instanceof HFileBlock) { 2003 return ((HFileBlock) data).getByteBuffAllocator(); 2004 } 2005 return ByteBuffAllocator.HEAP; 2006 } 2007 2008 public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc, 2009 final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler, 2010 ByteBuffer metaBuff, final Long acceptableSize) throws IOException { 2011 int len = data.getSerializedLength(); 2012 // This cacheable thing can't be serialized 2013 if (len == 0) { 2014 return null; 2015 } 2016 if (isCachePersistent && data instanceof HFileBlock) { 2017 len += Long.BYTES; // we need to record the cache time for consistency check in case of 2018 // recovery 2019 } 2020 long offset = alloc.allocateBlock(len); 2021 // In the case of prefetch, we want to avoid freeSpace runs when the cache is full. 2022 // this makes the cache allocation more predictable, and is particularly important 2023 // when persistent cache is enabled, as it won't trigger evictions of the recovered blocks, 2024 // which are likely the most accessed and relevant blocks in the cache. 2025 if (isPrefetch() && alloc.getUsedSize() > acceptableSize) { 2026 alloc.freeBlock(offset, len); 2027 return null; 2028 } 2029 boolean succ = false; 2030 BucketEntry bucketEntry = null; 2031 try { 2032 int diskSizeWithHeader = (data instanceof HFileBlock) 2033 ? ((HFileBlock) data).getOnDiskSizeWithHeader() 2034 : data.getSerializedLength(); 2035 bucketEntry = new BucketEntry(offset, len, diskSizeWithHeader, accessCounter, inMemory, 2036 createRecycler, getByteBuffAllocator()); 2037 bucketEntry.setDeserializerReference(data.getDeserializer()); 2038 if (data instanceof HFileBlock) { 2039 // If an instance of HFileBlock, save on some allocations. 2040 HFileBlock block = (HFileBlock) data; 2041 ByteBuff sliceBuf = block.getBufferReadOnly(); 2042 block.getMetaData(metaBuff); 2043 // adds the cache time prior to the block and metadata part 2044 if (isCachePersistent) { 2045 ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); 2046 buffer.putLong(bucketEntry.getCachedTime()); 2047 buffer.rewind(); 2048 ioEngine.write(buffer, offset); 2049 ioEngine.write(sliceBuf, (offset + Long.BYTES)); 2050 } else { 2051 ioEngine.write(sliceBuf, offset); 2052 } 2053 ioEngine.write(metaBuff, offset + len - metaBuff.limit()); 2054 } else { 2055 // Only used for testing. 2056 ByteBuffer bb = ByteBuffer.allocate(len); 2057 data.serialize(bb, true); 2058 ioEngine.write(bb, offset); 2059 } 2060 succ = true; 2061 } finally { 2062 if (!succ) { 2063 alloc.freeBlock(offset, len); 2064 } 2065 } 2066 realCacheSize.add(len); 2067 return bucketEntry; 2068 } 2069 } 2070 2071 /** 2072 * Only used in test 2073 */ 2074 void stopWriterThreads() throws InterruptedException { 2075 for (WriterThread writerThread : writerThreads) { 2076 writerThread.disableWriter(); 2077 writerThread.interrupt(); 2078 writerThread.join(); 2079 } 2080 } 2081 2082 @Override 2083 public Iterator<CachedBlock> iterator() { 2084 // Don't bother with ramcache since stuff is in here only a little while. 2085 final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i = this.backingMap.entrySet().iterator(); 2086 return new Iterator<CachedBlock>() { 2087 private final long now = System.nanoTime(); 2088 2089 @Override 2090 public boolean hasNext() { 2091 return i.hasNext(); 2092 } 2093 2094 @Override 2095 public CachedBlock next() { 2096 final Map.Entry<BlockCacheKey, BucketEntry> e = i.next(); 2097 return new CachedBlock() { 2098 @Override 2099 public String toString() { 2100 return BlockCacheUtil.toString(this, now); 2101 } 2102 2103 @Override 2104 public BlockPriority getBlockPriority() { 2105 return e.getValue().getPriority(); 2106 } 2107 2108 @Override 2109 public BlockType getBlockType() { 2110 // Not held by BucketEntry. Could add it if wanted on BucketEntry creation. 2111 return null; 2112 } 2113 2114 @Override 2115 public long getOffset() { 2116 return e.getKey().getOffset(); 2117 } 2118 2119 @Override 2120 public long getSize() { 2121 return e.getValue().getLength(); 2122 } 2123 2124 @Override 2125 public long getCachedTime() { 2126 return e.getValue().getCachedTime(); 2127 } 2128 2129 @Override 2130 public String getFilename() { 2131 return e.getKey().getHfileName(); 2132 } 2133 2134 @Override 2135 public int compareTo(CachedBlock other) { 2136 int diff = this.getFilename().compareTo(other.getFilename()); 2137 if (diff != 0) return diff; 2138 2139 diff = Long.compare(this.getOffset(), other.getOffset()); 2140 if (diff != 0) return diff; 2141 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) { 2142 throw new IllegalStateException( 2143 "" + this.getCachedTime() + ", " + other.getCachedTime()); 2144 } 2145 return Long.compare(other.getCachedTime(), this.getCachedTime()); 2146 } 2147 2148 @Override 2149 public int hashCode() { 2150 return e.getKey().hashCode(); 2151 } 2152 2153 @Override 2154 public boolean equals(Object obj) { 2155 if (obj instanceof CachedBlock) { 2156 CachedBlock cb = (CachedBlock) obj; 2157 return compareTo(cb) == 0; 2158 } else { 2159 return false; 2160 } 2161 } 2162 }; 2163 } 2164 2165 @Override 2166 public void remove() { 2167 throw new UnsupportedOperationException(); 2168 } 2169 }; 2170 } 2171 2172 @Override 2173 public BlockCache[] getBlockCaches() { 2174 return null; 2175 } 2176 2177 public int getRpcRefCount(BlockCacheKey cacheKey) { 2178 BucketEntry bucketEntry = backingMap.get(cacheKey); 2179 if (bucketEntry != null) { 2180 return bucketEntry.refCnt() - (bucketEntry.markedAsEvicted.get() ? 0 : 1); 2181 } 2182 return 0; 2183 } 2184 2185 float getAcceptableFactor() { 2186 return acceptableFactor; 2187 } 2188 2189 float getMinFactor() { 2190 return minFactor; 2191 } 2192 2193 float getExtraFreeFactor() { 2194 return extraFreeFactor; 2195 } 2196 2197 float getSingleFactor() { 2198 return singleFactor; 2199 } 2200 2201 float getMultiFactor() { 2202 return multiFactor; 2203 } 2204 2205 float getMemoryFactor() { 2206 return memoryFactor; 2207 } 2208 2209 long getQueueAdditionWaitTime() { 2210 return queueAdditionWaitTime; 2211 } 2212 2213 long getPersistenceChunkSize() { 2214 return persistenceChunkSize; 2215 } 2216 2217 long getBucketcachePersistInterval() { 2218 return bucketcachePersistInterval; 2219 } 2220 2221 public String getPersistencePath() { 2222 return persistencePath; 2223 } 2224 2225 /** 2226 * Wrapped the delegate ConcurrentMap with maintaining its block's reference count. 2227 */ 2228 static class RAMCache { 2229 /** 2230 * Defined the map as {@link ConcurrentHashMap} explicitly here, because in 2231 * {@link RAMCache#get(BlockCacheKey)} and 2232 * {@link RAMCache#putIfAbsent(BlockCacheKey, BucketCache.RAMQueueEntry)} , we need to guarantee 2233 * the atomicity of map#computeIfPresent(key, func) and map#putIfAbsent(key, func). Besides, the 2234 * func method can execute exactly once only when the key is present(or absent) and under the 2235 * lock context. Otherwise, the reference count of block will be messed up. Notice that the 2236 * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that. 2237 */ 2238 final ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> delegate = new ConcurrentHashMap<>(); 2239 2240 public boolean containsKey(BlockCacheKey key) { 2241 return delegate.containsKey(key); 2242 } 2243 2244 public RAMQueueEntry get(BlockCacheKey key) { 2245 return delegate.computeIfPresent(key, (k, re) -> { 2246 // It'll be referenced by RPC, so retain atomically here. if the get and retain is not 2247 // atomic, another thread may remove and release the block, when retaining in this thread we 2248 // may retain a block with refCnt=0 which is disallowed. (see HBASE-22422) 2249 re.getData().retain(); 2250 return re; 2251 }); 2252 } 2253 2254 /** 2255 * Return the previous associated value, or null if absent. It has the same meaning as 2256 * {@link ConcurrentMap#putIfAbsent(Object, Object)} 2257 */ 2258 public RAMQueueEntry putIfAbsent(BlockCacheKey key, RAMQueueEntry entry) { 2259 AtomicBoolean absent = new AtomicBoolean(false); 2260 RAMQueueEntry re = delegate.computeIfAbsent(key, k -> { 2261 // The RAMCache reference to this entry, so reference count should be increment. 2262 entry.getData().retain(); 2263 absent.set(true); 2264 return entry; 2265 }); 2266 return absent.get() ? null : re; 2267 } 2268 2269 public boolean remove(BlockCacheKey key) { 2270 return remove(key, re -> { 2271 }); 2272 } 2273 2274 /** 2275 * Defined an {@link Consumer} here, because once the removed entry release its reference count, 2276 * then it's ByteBuffers may be recycled and accessing it outside this method will be thrown an 2277 * exception. the consumer will access entry to remove before release its reference count. 2278 * Notice, don't change its reference count in the {@link Consumer} 2279 */ 2280 public boolean remove(BlockCacheKey key, Consumer<RAMQueueEntry> action) { 2281 RAMQueueEntry previous = delegate.remove(key); 2282 action.accept(previous); 2283 if (previous != null) { 2284 previous.getData().release(); 2285 } 2286 return previous != null; 2287 } 2288 2289 public boolean isEmpty() { 2290 return delegate.isEmpty(); 2291 } 2292 2293 public void clear() { 2294 Iterator<Map.Entry<BlockCacheKey, RAMQueueEntry>> it = delegate.entrySet().iterator(); 2295 while (it.hasNext()) { 2296 RAMQueueEntry re = it.next().getValue(); 2297 it.remove(); 2298 re.getData().release(); 2299 } 2300 } 2301 2302 public boolean hasBlocksForFile(String fileName) { 2303 return delegate.keySet().stream().filter(key -> key.getHfileName().equals(fileName)) 2304 .findFirst().isPresent(); 2305 } 2306 } 2307 2308 public Map<BlockCacheKey, BucketEntry> getBackingMap() { 2309 return backingMap; 2310 } 2311 2312 public AtomicBoolean getBackingMapValidated() { 2313 return backingMapValidated; 2314 } 2315 2316 @Override 2317 public Optional<Map<String, Pair<String, Long>>> getFullyCachedFiles() { 2318 return Optional.of(fullyCachedFiles); 2319 } 2320 2321 public static Optional<BucketCache> getBucketCacheFromCacheConfig(CacheConfig cacheConf) { 2322 if (cacheConf.getBlockCache().isPresent()) { 2323 BlockCache bc = cacheConf.getBlockCache().get(); 2324 if (bc instanceof CombinedBlockCache) { 2325 BlockCache l2 = ((CombinedBlockCache) bc).getSecondLevelCache(); 2326 if (l2 instanceof BucketCache) { 2327 return Optional.of((BucketCache) l2); 2328 } 2329 } else if (bc instanceof BucketCache) { 2330 return Optional.of((BucketCache) bc); 2331 } 2332 } 2333 return Optional.empty(); 2334 } 2335 2336 @Override 2337 public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int dataBlockCount, 2338 long size) { 2339 // block eviction may be happening in the background as prefetch runs, 2340 // so we need to count all blocks for this file in the backing map under 2341 // a read lock for the block offset 2342 final List<ReentrantReadWriteLock> locks = new ArrayList<>(); 2343 LOG.debug("Notifying caching completed for file {}, with total blocks {}, and data blocks {}", 2344 fileName, totalBlockCount, dataBlockCount); 2345 try { 2346 final MutableInt count = new MutableInt(); 2347 LOG.debug("iterating over {} entries in the backing map", backingMap.size()); 2348 Set<BlockCacheKey> result = getAllCacheKeysForFile(fileName.getName(), 0, Long.MAX_VALUE); 2349 if (result.isEmpty() && StoreFileInfo.isReference(fileName)) { 2350 result = getAllCacheKeysForFile( 2351 StoreFileInfo.getReferredToRegionAndFile(fileName.getName()).getSecond(), 0, 2352 Long.MAX_VALUE); 2353 } 2354 result.stream().forEach(entry -> { 2355 LOG.debug("found block for file {} in the backing map. Acquiring read lock for offset {}", 2356 fileName.getName(), entry.getOffset()); 2357 ReentrantReadWriteLock lock = offsetLock.getLock(entry.getOffset()); 2358 lock.readLock().lock(); 2359 locks.add(lock); 2360 if (backingMap.containsKey(entry) && entry.getBlockType().isData()) { 2361 count.increment(); 2362 } 2363 }); 2364 // BucketCache would only have data blocks 2365 if (dataBlockCount == count.getValue()) { 2366 LOG.debug("File {} has now been fully cached.", fileName); 2367 fileCacheCompleted(fileName, size); 2368 } else { 2369 LOG.debug( 2370 "Prefetch executor completed for {}, but only {} data blocks were cached. " 2371 + "Total data blocks for file: {}. " 2372 + "Checking for blocks pending cache in cache writer queue.", 2373 fileName, count.getValue(), dataBlockCount); 2374 if (ramCache.hasBlocksForFile(fileName.getName())) { 2375 for (ReentrantReadWriteLock lock : locks) { 2376 lock.readLock().unlock(); 2377 } 2378 locks.clear(); 2379 LOG.debug("There are still blocks pending caching for file {}. Will sleep 100ms " 2380 + "and try the verification again.", fileName); 2381 Thread.sleep(100); 2382 notifyFileCachingCompleted(fileName, totalBlockCount, dataBlockCount, size); 2383 } else { 2384 LOG.info( 2385 "The total block count was {}. We found only {} data blocks cached from " 2386 + "a total of {} data blocks for file {}, " 2387 + "but no blocks pending caching. Maybe cache is full or evictions " 2388 + "happened concurrently to cache prefetch.", 2389 totalBlockCount, count, dataBlockCount, fileName); 2390 } 2391 } 2392 } catch (InterruptedException e) { 2393 throw new RuntimeException(e); 2394 } finally { 2395 for (ReentrantReadWriteLock lock : locks) { 2396 lock.readLock().unlock(); 2397 } 2398 } 2399 } 2400 2401 @Override 2402 public Optional<Boolean> blockFitsIntoTheCache(HFileBlock block) { 2403 if (!isCacheInitialized("blockFitsIntoTheCache")) { 2404 return Optional.of(false); 2405 } 2406 2407 long currentUsed = bucketAllocator.getUsedSize(); 2408 boolean result = (currentUsed + block.getOnDiskSizeWithHeader()) < acceptableSize(); 2409 return Optional.of(result); 2410 } 2411 2412 @Override 2413 public Optional<Boolean> shouldCacheFile(String fileName) { 2414 // if we don't have the file in fullyCachedFiles, we should cache it 2415 return Optional.of(!fullyCachedFiles.containsKey(fileName)); 2416 } 2417 2418 @Override 2419 public Optional<Boolean> isAlreadyCached(BlockCacheKey key) { 2420 boolean foundKey = backingMap.containsKey(key); 2421 // if there's no entry for the key itself, we need to check if this key is for a reference, 2422 // and if so, look for a block from the referenced file using this getBlockForReference method. 2423 return Optional.of(foundKey ? true : getBlockForReference(key) != null); 2424 } 2425 2426 @Override 2427 public Optional<Integer> getBlockSize(BlockCacheKey key) { 2428 BucketEntry entry = backingMap.get(key); 2429 if (entry == null) { 2430 // the key might be for a reference tha we had found the block from the referenced file in 2431 // the cache when we first tried to cache it. 2432 entry = getBlockForReference(key); 2433 return entry == null ? Optional.empty() : Optional.of(entry.getOnDiskSizeWithHeader()); 2434 } else { 2435 return Optional.of(entry.getOnDiskSizeWithHeader()); 2436 } 2437 2438 } 2439 2440 boolean isCacheInitialized(String api) { 2441 if (cacheState == CacheState.INITIALIZING) { 2442 LOG.warn("Bucket initialisation pending at {}", api); 2443 return false; 2444 } 2445 return true; 2446 } 2447 2448 @Override 2449 public boolean waitForCacheInitialization(long timeout) { 2450 try { 2451 while (cacheState == CacheState.INITIALIZING) { 2452 if (timeout <= 0) { 2453 break; 2454 } 2455 Thread.sleep(100); 2456 timeout -= 100; 2457 } 2458 } finally { 2459 return isCacheEnabled(); 2460 } 2461 } 2462}