001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BLOCKCACHE_STATS_PERIODS; 021import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BLOCKCACHE_STATS_PERIOD_MINUTES_KEY; 022import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.DEFAULT_BLOCKCACHE_STATS_PERIODS; 023import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.DEFAULT_BLOCKCACHE_STATS_PERIOD_MINUTES; 024import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; 025 026import java.io.File; 027import java.io.FileInputStream; 028import java.io.FileOutputStream; 029import java.io.IOException; 030import java.nio.ByteBuffer; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.Collections; 034import java.util.Comparator; 035import java.util.HashSet; 036import java.util.Iterator; 037import java.util.List; 038import java.util.Map; 039import java.util.NavigableSet; 040import java.util.Optional; 041import java.util.PriorityQueue; 042import java.util.Set; 043import java.util.concurrent.ArrayBlockingQueue; 044import java.util.concurrent.BlockingQueue; 045import java.util.concurrent.ConcurrentHashMap; 046import java.util.concurrent.ConcurrentMap; 047import java.util.concurrent.ConcurrentSkipListSet; 048import java.util.concurrent.Executors; 049import java.util.concurrent.ScheduledExecutorService; 050import java.util.concurrent.TimeUnit; 051import java.util.concurrent.atomic.AtomicBoolean; 052import java.util.concurrent.atomic.AtomicLong; 053import java.util.concurrent.atomic.LongAdder; 054import java.util.concurrent.locks.Lock; 055import java.util.concurrent.locks.ReentrantLock; 056import java.util.concurrent.locks.ReentrantReadWriteLock; 057import java.util.function.Consumer; 058import java.util.function.Function; 059import org.apache.commons.io.IOUtils; 060import org.apache.hadoop.conf.Configuration; 061import org.apache.hadoop.fs.Path; 062import org.apache.hadoop.hbase.HBaseConfiguration; 063import org.apache.hadoop.hbase.HBaseIOException; 064import org.apache.hadoop.hbase.TableName; 065import org.apache.hadoop.hbase.client.Admin; 066import org.apache.hadoop.hbase.io.ByteBuffAllocator; 067import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; 068import org.apache.hadoop.hbase.io.HeapSize; 069import org.apache.hadoop.hbase.io.hfile.BlockCache; 070import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 071import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil; 072import org.apache.hadoop.hbase.io.hfile.BlockPriority; 073import org.apache.hadoop.hbase.io.hfile.BlockType; 074import org.apache.hadoop.hbase.io.hfile.CacheConfig; 075import org.apache.hadoop.hbase.io.hfile.CacheStats; 076import org.apache.hadoop.hbase.io.hfile.Cacheable; 077import org.apache.hadoop.hbase.io.hfile.CachedBlock; 078import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; 079import org.apache.hadoop.hbase.io.hfile.HFileBlock; 080import org.apache.hadoop.hbase.io.hfile.HFileContext; 081import org.apache.hadoop.hbase.io.hfile.HFileInfo; 082import org.apache.hadoop.hbase.nio.ByteBuff; 083import org.apache.hadoop.hbase.nio.RefCnt; 084import org.apache.hadoop.hbase.protobuf.ProtobufMagic; 085import org.apache.hadoop.hbase.regionserver.DataTieringManager; 086import org.apache.hadoop.hbase.regionserver.HRegion; 087import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 088import org.apache.hadoop.hbase.util.Bytes; 089import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 090import org.apache.hadoop.hbase.util.IdReadWriteLock; 091import org.apache.hadoop.hbase.util.IdReadWriteLockStrongRef; 092import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool; 093import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool.ReferenceType; 094import org.apache.hadoop.hbase.util.Pair; 095import org.apache.hadoop.util.StringUtils; 096import org.apache.yetus.audience.InterfaceAudience; 097import org.slf4j.Logger; 098import org.slf4j.LoggerFactory; 099 100import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 101import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 102 103import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos; 104 105/** 106 * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses BucketCache#ramCache 107 * and BucketCache#backingMap in order to determine if a given element is in the cache. The bucket 108 * cache can use off-heap memory {@link ByteBufferIOEngine} or mmap 109 * {@link ExclusiveMemoryMmapIOEngine} or pmem {@link SharedMemoryMmapIOEngine} or local files 110 * {@link FileIOEngine} to store/read the block data. 111 * <p> 112 * Eviction is via a similar algorithm as used in 113 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache} 114 * <p> 115 * BucketCache can be used as mainly a block cache (see 116 * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with a BlockCache to 117 * decrease CMS GC and heap fragmentation. 118 * <p> 119 * It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store blocks) to 120 * enlarge cache space via a victim cache. 121 */ 122@InterfaceAudience.Private 123public class BucketCache implements BlockCache, HeapSize { 124 private static final Logger LOG = LoggerFactory.getLogger(BucketCache.class); 125 126 /** Priority buckets config */ 127 static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor"; 128 static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor"; 129 static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor"; 130 static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor"; 131 static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor"; 132 static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor"; 133 static final String BACKING_MAP_PERSISTENCE_CHUNK_SIZE = 134 "hbase.bucketcache.persistence.chunksize"; 135 136 /** Use strong reference for offsetLock or not */ 137 private static final String STRONG_REF_KEY = "hbase.bucketcache.offsetlock.usestrongref"; 138 private static final boolean STRONG_REF_DEFAULT = false; 139 140 /** The cache age of blocks to check if the related file is present on any online regions. */ 141 static final String BLOCK_ORPHAN_GRACE_PERIOD = 142 "hbase.bucketcache.block.orphan.evictgraceperiod.seconds"; 143 144 static final long BLOCK_ORPHAN_GRACE_PERIOD_DEFAULT = 24 * 60 * 60 * 1000L; 145 146 /** Priority buckets */ 147 static final float DEFAULT_SINGLE_FACTOR = 0.25f; 148 static final float DEFAULT_MULTI_FACTOR = 0.50f; 149 static final float DEFAULT_MEMORY_FACTOR = 0.25f; 150 static final float DEFAULT_MIN_FACTOR = 0.85f; 151 152 static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f; 153 static final float DEFAULT_ACCEPT_FACTOR = 0.95f; 154 155 // Number of blocks to clear for each of the bucket size that is full 156 static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2; 157 158 /** Statistics thread */ 159 private static final int statThreadPeriod = 5 * 60; 160 161 final static int DEFAULT_WRITER_THREADS = 3; 162 final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; 163 164 final static long DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE = 10000; 165 166 // Store/read block data 167 transient final IOEngine ioEngine; 168 169 // Store the block in this map before writing it to cache 170 transient final RAMCache ramCache; 171 172 // In this map, store the block's meta data like offset, length 173 transient Map<BlockCacheKey, BucketEntry> backingMap; 174 175 private AtomicBoolean backingMapValidated = new AtomicBoolean(false); 176 177 /** 178 * Map of hFile -> Region -> File size. This map is used to track all files completed prefetch, 179 * together with the region those belong to and the total cached size for the 180 * region.TestBlockEvictionOnRegionMovement 181 */ 182 transient final Map<String, Pair<String, Long>> fullyCachedFiles = new ConcurrentHashMap<>(); 183 /** 184 * Map of region -> total size of the region prefetched on this region server. This is the total 185 * size of hFiles for this region prefetched on this region server 186 */ 187 final Map<String, Long> regionCachedSize = new ConcurrentHashMap<>(); 188 189 private transient BucketCachePersister cachePersister; 190 191 /** 192 * Enum to represent the state of cache 193 */ 194 protected enum CacheState { 195 // Initializing: State when the cache is being initialised from persistence. 196 INITIALIZING, 197 // Enabled: State when cache is initialised and is ready. 198 ENABLED, 199 // Disabled: State when the cache is disabled. 200 DISABLED 201 } 202 203 /** 204 * Flag if the cache is enabled or not... We shut it off if there are IO errors for some time, so 205 * that Bucket IO exceptions/errors don't bring down the HBase server. 206 */ 207 private volatile CacheState cacheState; 208 209 /** 210 * A list of writer queues. We have a queue per {@link WriterThread} we have running. In other 211 * words, the work adding blocks to the BucketCache is divided up amongst the running 212 * WriterThreads. Its done by taking hash of the cache key modulo queue count. WriterThread when 213 * it runs takes whatever has been recently added and 'drains' the entries to the BucketCache. It 214 * then updates the ramCache and backingMap accordingly. 215 */ 216 transient final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList<>(); 217 transient final WriterThread[] writerThreads; 218 219 /** Volatile boolean to track if free space is in process or not */ 220 private volatile boolean freeInProgress = false; 221 private transient final Lock freeSpaceLock = new ReentrantLock(); 222 223 private final LongAdder realCacheSize = new LongAdder(); 224 private final LongAdder heapSize = new LongAdder(); 225 /** Current number of cached elements */ 226 private final LongAdder blockNumber = new LongAdder(); 227 228 /** Cache access count (sequential ID) */ 229 private final AtomicLong accessCount = new AtomicLong(); 230 231 private final BucketCacheStats cacheStats; 232 private final String persistencePath; 233 static AtomicBoolean isCacheInconsistent = new AtomicBoolean(false); 234 private final long cacheCapacity; 235 /** Approximate block size */ 236 private final long blockSize; 237 238 /** Duration of IO errors tolerated before we disable cache, 1 min as default */ 239 private final int ioErrorsTolerationDuration; 240 // 1 min 241 public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000; 242 243 // Start time of first IO error when reading or writing IO Engine, it will be 244 // reset after a successful read/write. 245 private volatile long ioErrorStartTime = -1; 246 247 private transient Configuration conf; 248 249 /** 250 * A ReentrantReadWriteLock to lock on a particular block identified by offset. The purpose of 251 * this is to avoid freeing the block which is being read. 252 * <p> 253 */ 254 transient final IdReadWriteLock<Long> offsetLock; 255 256 transient NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<>( 257 Comparator.comparing(BlockCacheKey::getHfileName).thenComparingLong(BlockCacheKey::getOffset)); 258 259 /** Statistics thread schedule pool (for heavy debugging, could remove) */ 260 private transient final ScheduledExecutorService scheduleThreadPool = 261 Executors.newScheduledThreadPool(1, 262 new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build()); 263 264 // Allocate or free space for the block 265 private transient BucketAllocator bucketAllocator; 266 267 /** Acceptable size of cache (no evictions if size < acceptable) */ 268 private float acceptableFactor; 269 270 /** Minimum threshold of cache (when evicting, evict until size < min) */ 271 private float minFactor; 272 273 /** 274 * Free this floating point factor of extra blocks when evicting. For example free the number of 275 * blocks requested * (1 + extraFreeFactor) 276 */ 277 private float extraFreeFactor; 278 279 /** Single access bucket size */ 280 private float singleFactor; 281 282 /** Multiple access bucket size */ 283 private float multiFactor; 284 285 /** In-memory bucket size */ 286 private float memoryFactor; 287 288 private long bucketcachePersistInterval; 289 290 private static final String FILE_VERIFY_ALGORITHM = 291 "hbase.bucketcache.persistent.file.integrity.check.algorithm"; 292 private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5"; 293 294 static final String QUEUE_ADDITION_WAIT_TIME = "hbase.bucketcache.queue.addition.waittime"; 295 private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0; 296 private long queueAdditionWaitTime; 297 /** 298 * Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file 299 * integrity, default algorithm is MD5 300 */ 301 private String algorithm; 302 303 private long persistenceChunkSize; 304 305 /* Tracing failed Bucket Cache allocations. */ 306 private long allocFailLogPrevTs; // time of previous log event for allocation failure. 307 private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000; // Default 1 minute. 308 309 private transient Map<String, HRegion> onlineRegions; 310 311 private long orphanBlockGracePeriod = 0; 312 313 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 314 int writerThreadNum, int writerQLen, String persistencePath) throws IOException { 315 this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 316 persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create()); 317 } 318 319 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 320 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, 321 Configuration conf) throws IOException { 322 this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 323 persistencePath, ioErrorsTolerationDuration, conf, null); 324 } 325 326 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 327 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, 328 Configuration conf, Map<String, HRegion> onlineRegions) throws IOException { 329 Preconditions.checkArgument(blockSize > 0, 330 "BucketCache capacity is set to " + blockSize + ", can not be less than 0"); 331 boolean useStrongRef = conf.getBoolean(STRONG_REF_KEY, STRONG_REF_DEFAULT); 332 if (useStrongRef) { 333 this.offsetLock = new IdReadWriteLockStrongRef<>(); 334 } else { 335 this.offsetLock = new IdReadWriteLockWithObjectPool<>(ReferenceType.SOFT); 336 } 337 this.conf = conf; 338 this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM); 339 this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath); 340 this.writerThreads = new WriterThread[writerThreadNum]; 341 this.onlineRegions = onlineRegions; 342 this.orphanBlockGracePeriod = 343 conf.getLong(BLOCK_ORPHAN_GRACE_PERIOD, BLOCK_ORPHAN_GRACE_PERIOD_DEFAULT); 344 long blockNumCapacity = capacity / blockSize; 345 if (blockNumCapacity >= Integer.MAX_VALUE) { 346 // Enough for about 32TB of cache! 347 throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now"); 348 } 349 350 // these sets the dynamic configs 351 this.onConfigurationChange(conf); 352 this.cacheStats = 353 new BucketCacheStats(conf.getInt(BLOCKCACHE_STATS_PERIODS, DEFAULT_BLOCKCACHE_STATS_PERIODS), 354 conf.getInt(BLOCKCACHE_STATS_PERIOD_MINUTES_KEY, DEFAULT_BLOCKCACHE_STATS_PERIOD_MINUTES)); 355 356 LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor 357 + ", minFactor: " + minFactor + ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: " 358 + singleFactor + ", multiFactor: " + multiFactor + ", memoryFactor: " + memoryFactor 359 + ", useStrongRef: " + useStrongRef); 360 361 this.cacheCapacity = capacity; 362 this.persistencePath = persistencePath; 363 this.blockSize = blockSize; 364 this.ioErrorsTolerationDuration = ioErrorsTolerationDuration; 365 this.cacheState = CacheState.INITIALIZING; 366 367 this.allocFailLogPrevTs = 0; 368 369 for (int i = 0; i < writerThreads.length; ++i) { 370 writerQueues.add(new ArrayBlockingQueue<>(writerQLen)); 371 } 372 373 assert writerQueues.size() == writerThreads.length; 374 this.ramCache = new RAMCache(); 375 376 this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity); 377 instantiateWriterThreads(); 378 379 if (isCachePersistent()) { 380 if (ioEngine instanceof FileIOEngine) { 381 startBucketCachePersisterThread(); 382 } 383 startPersistenceRetriever(bucketSizes, capacity); 384 } else { 385 bucketAllocator = new BucketAllocator(capacity, bucketSizes); 386 this.cacheState = CacheState.ENABLED; 387 startWriterThreads(); 388 } 389 390 // Run the statistics thread periodically to print the cache statistics log 391 // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log 392 // every five minutes. 393 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod, 394 statThreadPeriod, TimeUnit.SECONDS); 395 LOG.info("Started bucket cache; ioengine=" + ioEngineName + ", capacity=" 396 + StringUtils.byteDesc(capacity) + ", blockSize=" + StringUtils.byteDesc(blockSize) 397 + ", writerThreadNum=" + writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" 398 + persistencePath + ", bucketAllocator=" + BucketAllocator.class.getName()); 399 } 400 401 private void startPersistenceRetriever(int[] bucketSizes, long capacity) { 402 Runnable persistentCacheRetriever = () -> { 403 try { 404 retrieveFromFile(bucketSizes); 405 LOG.info("Persistent bucket cache recovery from {} is complete.", persistencePath); 406 } catch (Throwable ex) { 407 LOG.warn("Can't restore from file[{}]. The bucket cache will be reset and rebuilt." 408 + " Exception seen: ", persistencePath, ex); 409 backingMap.clear(); 410 fullyCachedFiles.clear(); 411 backingMapValidated.set(true); 412 regionCachedSize.clear(); 413 try { 414 bucketAllocator = new BucketAllocator(capacity, bucketSizes); 415 } catch (BucketAllocatorException allocatorException) { 416 LOG.error("Exception during Bucket Allocation", allocatorException); 417 } 418 } finally { 419 this.cacheState = CacheState.ENABLED; 420 startWriterThreads(); 421 } 422 }; 423 Thread t = new Thread(persistentCacheRetriever); 424 t.start(); 425 } 426 427 private void sanityCheckConfigs() { 428 Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0, 429 ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 430 Preconditions.checkArgument(minFactor <= 1 && minFactor >= 0, 431 MIN_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 432 Preconditions.checkArgument(minFactor <= acceptableFactor, 433 MIN_FACTOR_CONFIG_NAME + " must be <= " + ACCEPT_FACTOR_CONFIG_NAME); 434 Preconditions.checkArgument(extraFreeFactor >= 0, 435 EXTRA_FREE_FACTOR_CONFIG_NAME + " must be greater than 0.0"); 436 Preconditions.checkArgument(singleFactor <= 1 && singleFactor >= 0, 437 SINGLE_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 438 Preconditions.checkArgument(multiFactor <= 1 && multiFactor >= 0, 439 MULTI_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 440 Preconditions.checkArgument(memoryFactor <= 1 && memoryFactor >= 0, 441 MEMORY_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 442 Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1, 443 SINGLE_FACTOR_CONFIG_NAME + ", " + MULTI_FACTOR_CONFIG_NAME + ", and " 444 + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0"); 445 if (this.persistenceChunkSize <= 0) { 446 persistenceChunkSize = DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE; 447 } 448 } 449 450 /** 451 * Called by the constructor to instantiate the writer threads. 452 */ 453 private void instantiateWriterThreads() { 454 final String threadName = Thread.currentThread().getName(); 455 for (int i = 0; i < this.writerThreads.length; ++i) { 456 this.writerThreads[i] = new WriterThread(this.writerQueues.get(i)); 457 this.writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); 458 this.writerThreads[i].setDaemon(true); 459 } 460 } 461 462 /** 463 * Called by the constructor to start the writer threads. Used by tests that need to override 464 * starting the threads. 465 */ 466 protected void startWriterThreads() { 467 for (WriterThread thread : writerThreads) { 468 thread.start(); 469 } 470 } 471 472 void startBucketCachePersisterThread() { 473 LOG.info("Starting BucketCachePersisterThread"); 474 cachePersister = new BucketCachePersister(this, bucketcachePersistInterval); 475 cachePersister.setDaemon(true); 476 cachePersister.start(); 477 } 478 479 @Override 480 public boolean isCacheEnabled() { 481 return this.cacheState == CacheState.ENABLED; 482 } 483 484 @Override 485 public long getMaxSize() { 486 return this.cacheCapacity; 487 } 488 489 public String getIoEngine() { 490 return ioEngine.toString(); 491 } 492 493 /** 494 * Get the IOEngine from the IO engine name 495 * @return the IOEngine 496 */ 497 private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath) 498 throws IOException { 499 if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) { 500 // In order to make the usage simple, we only need the prefix 'files:' in 501 // document whether one or multiple file(s), but also support 'file:' for 502 // the compatibility 503 String[] filePaths = 504 ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER); 505 return new FileIOEngine(capacity, persistencePath != null, filePaths); 506 } else if (ioEngineName.startsWith("offheap")) { 507 return new ByteBufferIOEngine(capacity); 508 } else if (ioEngineName.startsWith("mmap:")) { 509 return new ExclusiveMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 510 } else if (ioEngineName.startsWith("pmem:")) { 511 // This mode of bucket cache creates an IOEngine over a file on the persistent memory 512 // device. Since the persistent memory device has its own address space the contents 513 // mapped to this address space does not get swapped out like in the case of mmapping 514 // on to DRAM. Hence the cells created out of the hfile blocks in the pmem bucket cache 515 // can be directly referred to without having to copy them onheap. Once the RPC is done, 516 // the blocks can be returned back as in case of ByteBufferIOEngine. 517 return new SharedMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 518 } else { 519 throw new IllegalArgumentException( 520 "Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap"); 521 } 522 } 523 524 public boolean isCachePersistenceEnabled() { 525 return persistencePath != null; 526 } 527 528 /** 529 * Cache the block with the specified name and buffer. 530 * @param cacheKey block's cache key 531 * @param buf block buffer 532 */ 533 @Override 534 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 535 cacheBlock(cacheKey, buf, false); 536 } 537 538 /** 539 * Cache the block with the specified name and buffer. 540 * @param cacheKey block's cache key 541 * @param cachedItem block buffer 542 * @param inMemory if block is in-memory 543 */ 544 @Override 545 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) { 546 cacheBlockWithWait(cacheKey, cachedItem, inMemory, false); 547 } 548 549 /** 550 * Cache the block with the specified name and buffer. 551 * @param cacheKey block's cache key 552 * @param cachedItem block buffer 553 * @param inMemory if block is in-memory 554 */ 555 @Override 556 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, 557 boolean waitWhenCache) { 558 cacheBlockWithWait(cacheKey, cachedItem, inMemory, waitWhenCache && queueAdditionWaitTime > 0); 559 } 560 561 /** 562 * Cache the block to ramCache 563 * @param cacheKey block's cache key 564 * @param cachedItem block buffer 565 * @param inMemory if block is in-memory 566 * @param wait if true, blocking wait when queue is full 567 */ 568 public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, 569 boolean wait) { 570 if (isCacheEnabled()) { 571 if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) { 572 if (shouldReplaceExistingCacheBlock(cacheKey, cachedItem)) { 573 BucketEntry bucketEntry = backingMap.get(cacheKey); 574 if (bucketEntry != null && bucketEntry.isRpcRef()) { 575 // avoid replace when there are RPC refs for the bucket entry in bucket cache 576 return; 577 } 578 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 579 } 580 } else { 581 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 582 } 583 } 584 } 585 586 protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) { 587 return BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, newBlock); 588 } 589 590 protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, 591 boolean inMemory, boolean wait) { 592 if (!isCacheEnabled()) { 593 return; 594 } 595 if (cacheKey.getBlockType() == null && cachedItem.getBlockType() != null) { 596 cacheKey.setBlockType(cachedItem.getBlockType()); 597 } 598 LOG.debug("Caching key={}, item={}, key heap size={}", cacheKey, cachedItem, 599 cacheKey.heapSize()); 600 // Stuff the entry into the RAM cache so it can get drained to the persistent store 601 RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), 602 inMemory, isCachePersistent() && ioEngine instanceof FileIOEngine, wait); 603 /** 604 * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same 605 * key in ramCache, the heap size of bucket cache need to update if replacing entry from 606 * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if 607 * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct 608 * one, then the heap size will mess up (HBASE-20789) 609 */ 610 if (ramCache.putIfAbsent(cacheKey, re) != null) { 611 return; 612 } 613 int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); 614 BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum); 615 boolean successfulAddition = false; 616 if (wait) { 617 try { 618 successfulAddition = bq.offer(re, queueAdditionWaitTime, TimeUnit.MILLISECONDS); 619 } catch (InterruptedException e) { 620 LOG.error("Thread interrupted: ", e); 621 Thread.currentThread().interrupt(); 622 } 623 } else { 624 successfulAddition = bq.offer(re); 625 } 626 if (!successfulAddition) { 627 LOG.debug("Failed to insert block {} into the cache writers queue", cacheKey); 628 ramCache.remove(cacheKey); 629 cacheStats.failInsert(); 630 } else { 631 this.blockNumber.increment(); 632 this.heapSize.add(cachedItem.heapSize()); 633 } 634 } 635 636 /** 637 * If the passed cache key relates to a reference (<hfile>.<parentEncRegion>), this 638 * method looks for the block from the referred file, in the cache. If present in the cache, the 639 * block for the referred file is returned, otherwise, this method returns null. It will also 640 * return null if the passed cache key doesn't relate to a reference. 641 * @param key the BlockCacheKey instance to look for in the cache. 642 * @return the cached block from the referred file, null if there's no such block in the cache or 643 * the passed key doesn't relate to a reference. 644 */ 645 public BucketEntry getBlockForReference(BlockCacheKey key) { 646 BucketEntry foundEntry = null; 647 String referredFileName = null; 648 if (StoreFileInfo.isReference(key.getHfileName())) { 649 referredFileName = StoreFileInfo.getReferredToRegionAndFile(key.getHfileName()).getSecond(); 650 } 651 if (referredFileName != null) { 652 // Since we just need this key for a lookup, it's enough to use only name and offset 653 BlockCacheKey convertedCacheKey = new BlockCacheKey(referredFileName, key.getOffset()); 654 foundEntry = backingMap.get(convertedCacheKey); 655 LOG.debug("Got a link/ref: {}. Related cacheKey: {}. Found entry: {}", key.getHfileName(), 656 convertedCacheKey, foundEntry); 657 } 658 return foundEntry; 659 } 660 661 /** 662 * Get the buffer of the block with the specified key. 663 * @param key block's cache key 664 * @param caching true if the caller caches blocks on cache misses 665 * @param repeat Whether this is a repeat lookup for the same block 666 * @param updateCacheMetrics Whether we should update cache metrics or not 667 * @return buffer of specified cache key, or null if not in cache 668 */ 669 @Override 670 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, 671 boolean updateCacheMetrics) { 672 if (!isCacheEnabled()) { 673 cacheStats.miss(caching, key.isPrimary(), key.getBlockType()); 674 return null; 675 } 676 RAMQueueEntry re = ramCache.get(key); 677 if (re != null) { 678 if (updateCacheMetrics) { 679 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 680 } 681 re.access(accessCount.incrementAndGet()); 682 return re.getData(); 683 } 684 BucketEntry bucketEntry = backingMap.get(key); 685 LOG.debug("bucket entry for key {}: {}", key, 686 bucketEntry == null ? null : bucketEntry.offset()); 687 if (bucketEntry == null) { 688 bucketEntry = getBlockForReference(key); 689 } 690 if (bucketEntry != null) { 691 long start = System.nanoTime(); 692 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); 693 try { 694 lock.readLock().lock(); 695 // We can not read here even if backingMap does contain the given key because its offset 696 // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check 697 // existence here. 698 if ( 699 bucketEntry.equals(backingMap.get(key)) || bucketEntry.equals(getBlockForReference(key)) 700 ) { 701 // Read the block from IOEngine based on the bucketEntry's offset and length, NOTICE: the 702 // block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to 703 // the same BucketEntry, then all of the three will share the same refCnt. 704 Cacheable cachedBlock = ioEngine.read(bucketEntry); 705 if (ioEngine.usesSharedMemory()) { 706 // If IOEngine use shared memory, cachedBlock and BucketEntry will share the 707 // same RefCnt, do retain here, in order to count the number of RPC references 708 cachedBlock.retain(); 709 } 710 // Update the cache statistics. 711 if (updateCacheMetrics) { 712 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 713 cacheStats.ioHit(System.nanoTime() - start); 714 } 715 bucketEntry.access(accessCount.incrementAndGet()); 716 if (this.ioErrorStartTime > 0) { 717 ioErrorStartTime = -1; 718 } 719 return cachedBlock; 720 } 721 } catch (HBaseIOException hioex) { 722 // When using file io engine persistent cache, 723 // the cache map state might differ from the actual cache. If we reach this block, 724 // we should remove the cache key entry from the backing map 725 backingMap.remove(key); 726 fileNotFullyCached(key, bucketEntry); 727 LOG.debug("Failed to fetch block for cache key: {}.", key, hioex); 728 } catch (IOException ioex) { 729 LOG.error("Failed reading block " + key + " from bucket cache", ioex); 730 checkIOErrorIsTolerated(); 731 } finally { 732 lock.readLock().unlock(); 733 } 734 } 735 if (!repeat && updateCacheMetrics) { 736 cacheStats.miss(caching, key.isPrimary(), key.getBlockType()); 737 } 738 return null; 739 } 740 741 /** 742 * This method is invoked after the bucketEntry is removed from {@link BucketCache#backingMap} 743 */ 744 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber, 745 boolean evictedByEvictionProcess) { 746 bucketEntry.markAsEvicted(); 747 blocksByHFile.remove(cacheKey); 748 if (decrementBlockNumber) { 749 this.blockNumber.decrement(); 750 if (ioEngine.isPersistent()) { 751 fileNotFullyCached(cacheKey, bucketEntry); 752 } 753 } 754 if (evictedByEvictionProcess) { 755 cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); 756 } 757 if (ioEngine.isPersistent()) { 758 setCacheInconsistent(true); 759 } 760 } 761 762 private void fileNotFullyCached(BlockCacheKey key, BucketEntry entry) { 763 // Update the updateRegionCachedSize before removing the file from fullyCachedFiles. 764 // This computation should happen even if the file is not in fullyCachedFiles map. 765 updateRegionCachedSize(key, (entry.getLength() * -1)); 766 fullyCachedFiles.remove(key.getHfileName()); 767 } 768 769 public void fileCacheCompleted(Path filePath, long size) { 770 Pair<String, Long> pair = new Pair<>(); 771 // sets the region name 772 String regionName = filePath.getParent().getParent().getName(); 773 pair.setFirst(regionName); 774 pair.setSecond(size); 775 fullyCachedFiles.put(filePath.getName(), pair); 776 } 777 778 private void updateRegionCachedSize(BlockCacheKey key, long cachedSize) { 779 if (key.getRegionName() != null) { 780 if (key.isArchived()) { 781 LOG.trace("Skipping region cached size update for archived file:{} from region: {}", 782 key.getHfileName(), key.getRegionName()); 783 } else { 784 String regionName = key.getRegionName(); 785 regionCachedSize.merge(regionName, cachedSize, 786 (previousSize, newBlockSize) -> previousSize + newBlockSize); 787 // If all the blocks for a region are evicted from the cache, 788 // remove the entry for that region from regionCachedSize map. 789 if (regionCachedSize.getOrDefault(regionName, 0L) <= 0) { 790 regionCachedSize.remove(regionName); 791 } 792 } 793 } 794 } 795 796 /** 797 * Free the {{@link BucketEntry} actually,which could only be invoked when the 798 * {@link BucketEntry#refCnt} becoming 0. 799 */ 800 void freeBucketEntry(BucketEntry bucketEntry) { 801 bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength()); 802 realCacheSize.add(-1 * bucketEntry.getLength()); 803 } 804 805 /** 806 * Try to evict the block from {@link BlockCache} by force. We'll call this in few cases:<br> 807 * 1. Close an HFile, and clear all cached blocks. <br> 808 * 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for a given table.<br> 809 * <p> 810 * Firstly, we'll try to remove the block from RAMCache,and then try to evict from backingMap. 811 * Here we evict the block from backingMap immediately, but only free the reference from bucket 812 * cache by calling {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this 813 * block, block can only be de-allocated when all of them release the block. 814 * <p> 815 * NOTICE: we need to grab the write offset lock firstly before releasing the reference from 816 * bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt = 0 when 817 * {@link BucketCache#getBlock(BlockCacheKey, boolean, boolean, boolean)}, it's a memory leak. 818 * @param cacheKey Block to evict 819 * @return true to indicate whether we've evicted successfully or not. 820 */ 821 @Override 822 public boolean evictBlock(BlockCacheKey cacheKey) { 823 return doEvictBlock(cacheKey, null, false); 824 } 825 826 /** 827 * Evict the {@link BlockCacheKey} and {@link BucketEntry} from {@link BucketCache#backingMap} and 828 * {@link BucketCache#ramCache}. <br/> 829 * NOTE:When Evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and 830 * {@link BucketEntry} could be removed. 831 * @param cacheKey {@link BlockCacheKey} to evict. 832 * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict. 833 * @return true to indicate whether we've evicted successfully or not. 834 */ 835 private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry, 836 boolean evictedByEvictionProcess) { 837 if (!isCacheEnabled()) { 838 return false; 839 } 840 boolean existedInRamCache = removeFromRamCache(cacheKey); 841 if (bucketEntry == null) { 842 bucketEntry = backingMap.get(cacheKey); 843 } 844 final BucketEntry bucketEntryToUse = bucketEntry; 845 846 if (bucketEntryToUse == null) { 847 if (existedInRamCache && evictedByEvictionProcess) { 848 cacheStats.evicted(0, cacheKey.isPrimary()); 849 } 850 return existedInRamCache; 851 } else { 852 return bucketEntryToUse.withWriteLock(offsetLock, () -> { 853 if (backingMap.remove(cacheKey, bucketEntryToUse)) { 854 LOG.debug("removed key {} from back map with offset lock {} in the evict process", 855 cacheKey, bucketEntryToUse.offset()); 856 blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache, evictedByEvictionProcess); 857 return true; 858 } 859 return false; 860 }); 861 } 862 } 863 864 /** 865 * <pre> 866 * Create the {@link Recycler} for {@link BucketEntry#refCnt},which would be used as 867 * {@link RefCnt#recycler} of {@link HFileBlock#buf} returned from {@link BucketCache#getBlock}. 868 * NOTE: for {@link BucketCache#getBlock},the {@link RefCnt#recycler} of {@link HFileBlock#buf} 869 * from {@link BucketCache#backingMap} and {@link BucketCache#ramCache} are different: 870 * 1.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#backingMap}, 871 * it is the return value of current {@link BucketCache#createRecycler} method. 872 * 873 * 2.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#ramCache}, 874 * it is {@link ByteBuffAllocator#putbackBuffer}. 875 * </pre> 876 */ 877 public Recycler createRecycler(final BucketEntry bucketEntry) { 878 return () -> { 879 freeBucketEntry(bucketEntry); 880 return; 881 }; 882 } 883 884 /** 885 * NOTE: This method is only for test. 886 */ 887 public boolean evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey) { 888 BucketEntry bucketEntry = backingMap.get(blockCacheKey); 889 if (bucketEntry == null) { 890 return false; 891 } 892 return evictBucketEntryIfNoRpcReferenced(blockCacheKey, bucketEntry); 893 } 894 895 /** 896 * Evict {@link BlockCacheKey} and its corresponding {@link BucketEntry} only if 897 * {@link BucketEntry#isRpcRef} is false. <br/> 898 * NOTE:When evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and 899 * {@link BucketEntry} could be removed. 900 * @param blockCacheKey {@link BlockCacheKey} to evict. 901 * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict. 902 * @return true to indicate whether we've evicted successfully or not. 903 */ 904 boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, BucketEntry bucketEntry) { 905 if (!bucketEntry.isRpcRef()) { 906 return doEvictBlock(blockCacheKey, bucketEntry, true); 907 } 908 return false; 909 } 910 911 /** 912 * Since HBASE-29249, the following properties governin freeSpace behaviour and block priorities 913 * were made dynamically configurable: - hbase.bucketcache.acceptfactor - 914 * hbase.bucketcache.minfactor - hbase.bucketcache.extrafreefactor - 915 * hbase.bucketcache.single.factor - hbase.bucketcache.multi.factor - 916 * hbase.bucketcache.multi.factor - hbase.bucketcache.memory.factor The 917 * hbase.bucketcache.queue.addition.waittime property allows for introducing a delay in the 918 * publishing of blocks for the cache writer threads during prefetch reads only (client reads 919 * wouldn't get delayed). It has also been made dynamic configurable since HBASE-29249. The 920 * hbase.bucketcache.persist.intervalinmillis propperty determines the frequency for saving the 921 * persistent cache, and it has also been made dynamically configurable since HBASE-29249. The 922 * hbase.bucketcache.persistence.chunksize property determines the size of the persistent file 923 * splits (due to the limitation of maximum allowed protobuff size), and it has also been made 924 * dynamically configurable since HBASE-29249. 925 * @param config the new configuration to be updated. 926 */ 927 @Override 928 public void onConfigurationChange(Configuration config) { 929 this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR); 930 this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR); 931 this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR); 932 this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR); 933 this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR); 934 this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR); 935 this.queueAdditionWaitTime = 936 conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME); 937 this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000); 938 this.persistenceChunkSize = 939 conf.getLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE); 940 sanityCheckConfigs(); 941 } 942 943 protected boolean removeFromRamCache(BlockCacheKey cacheKey) { 944 return ramCache.remove(cacheKey, re -> { 945 if (re != null) { 946 this.blockNumber.decrement(); 947 this.heapSize.add(-1 * re.getData().heapSize()); 948 } 949 }); 950 } 951 952 public boolean isCacheInconsistent() { 953 return isCacheInconsistent.get(); 954 } 955 956 public void setCacheInconsistent(boolean setCacheInconsistent) { 957 isCacheInconsistent.set(setCacheInconsistent); 958 } 959 960 protected void setCacheState(CacheState state) { 961 cacheState = state; 962 } 963 964 /* 965 * Statistics thread. Periodically output cache statistics to the log. 966 */ 967 private static class StatisticsThread extends Thread { 968 private final BucketCache bucketCache; 969 970 public StatisticsThread(BucketCache bucketCache) { 971 super("BucketCacheStatsThread"); 972 setDaemon(true); 973 this.bucketCache = bucketCache; 974 } 975 976 @Override 977 public void run() { 978 bucketCache.logStats(); 979 } 980 } 981 982 public void logStats() { 983 if (!isCacheInitialized("BucketCache::logStats")) { 984 return; 985 } 986 987 long totalSize = bucketAllocator.getTotalSize(); 988 long usedSize = bucketAllocator.getUsedSize(); 989 long freeSize = totalSize - usedSize; 990 long cacheSize = getRealCacheSize(); 991 LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " + "totalSize=" 992 + StringUtils.byteDesc(totalSize) + ", " + "freeSize=" + StringUtils.byteDesc(freeSize) + ", " 993 + "usedSize=" + StringUtils.byteDesc(usedSize) + ", " + "cacheSize=" 994 + StringUtils.byteDesc(cacheSize) + ", " + "accesses=" + cacheStats.getRequestCount() + ", " 995 + "hits=" + cacheStats.getHitCount() + ", " + "IOhitsPerSecond=" 996 + cacheStats.getIOHitsPerSecond() + ", " + "IOTimePerHit=" 997 + String.format("%.2f", cacheStats.getIOTimePerHit()) + ", " + "hitRatio=" 998 + (cacheStats.getHitCount() == 0 999 ? "0," 1000 : (StringUtils.formatPercent(cacheStats.getHitRatio(), 2) + ", ")) 1001 + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits=" 1002 + cacheStats.getHitCachingCount() + ", " + "cachingHitsRatio=" 1003 + (cacheStats.getHitCachingCount() == 0 1004 ? "0," 1005 : (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2) + ", ")) 1006 + "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted=" 1007 + cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + cacheStats.evictedPerEviction() 1008 + ", " + "allocationFailCount=" + cacheStats.getAllocationFailCount() + ", blocksCount=" 1009 + backingMap.size()); 1010 cacheStats.reset(); 1011 1012 bucketAllocator.logDebugStatistics(); 1013 } 1014 1015 public long getRealCacheSize() { 1016 return this.realCacheSize.sum(); 1017 } 1018 1019 public long acceptableSize() { 1020 if (!isCacheInitialized("BucketCache::acceptableSize")) { 1021 return 0; 1022 } 1023 return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor); 1024 } 1025 1026 long getPartitionSize(float partitionFactor) { 1027 if (!isCacheInitialized("BucketCache::getPartitionSize")) { 1028 return 0; 1029 } 1030 1031 return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor); 1032 } 1033 1034 /** 1035 * Return the count of bucketSizeinfos still need free space 1036 */ 1037 private int bucketSizesAboveThresholdCount(float minFactor) { 1038 if (!isCacheInitialized("BucketCache::bucketSizesAboveThresholdCount")) { 1039 return 0; 1040 } 1041 1042 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 1043 int fullCount = 0; 1044 for (int i = 0; i < stats.length; i++) { 1045 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 1046 freeGoal = Math.max(freeGoal, 1); 1047 if (stats[i].freeCount() < freeGoal) { 1048 fullCount++; 1049 } 1050 } 1051 return fullCount; 1052 } 1053 1054 /** 1055 * This method will find the buckets that are minimally occupied and are not reference counted and 1056 * will free them completely without any constraint on the access times of the elements, and as a 1057 * process will completely free at most the number of buckets passed, sometimes it might not due 1058 * to changing refCounts 1059 * @param completelyFreeBucketsNeeded number of buckets to free 1060 **/ 1061 private void freeEntireBuckets(int completelyFreeBucketsNeeded) { 1062 if (!isCacheInitialized("BucketCache::freeEntireBuckets")) { 1063 return; 1064 } 1065 1066 if (completelyFreeBucketsNeeded != 0) { 1067 // First we will build a set where the offsets are reference counted, usually 1068 // this set is small around O(Handler Count) unless something else is wrong 1069 Set<Integer> inUseBuckets = new HashSet<>(); 1070 backingMap.forEach((k, be) -> { 1071 if (be.isRpcRef()) { 1072 inUseBuckets.add(bucketAllocator.getBucketIndex(be.offset())); 1073 } 1074 }); 1075 Set<Integer> candidateBuckets = 1076 bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded); 1077 for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) { 1078 if (candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset()))) { 1079 evictBucketEntryIfNoRpcReferenced(entry.getKey(), entry.getValue()); 1080 } 1081 } 1082 } 1083 } 1084 1085 private long calculateBytesToFree(StringBuilder msgBuffer) { 1086 long bytesToFreeWithoutExtra = 0; 1087 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 1088 long[] bytesToFreeForBucket = new long[stats.length]; 1089 for (int i = 0; i < stats.length; i++) { 1090 bytesToFreeForBucket[i] = 0; 1091 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 1092 freeGoal = Math.max(freeGoal, 1); 1093 if (stats[i].freeCount() < freeGoal) { 1094 bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount()); 1095 bytesToFreeWithoutExtra += bytesToFreeForBucket[i]; 1096 if (msgBuffer != null) { 1097 msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" 1098 + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", "); 1099 } 1100 } 1101 } 1102 if (msgBuffer != null) { 1103 msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); 1104 } 1105 return bytesToFreeWithoutExtra; 1106 } 1107 1108 /** 1109 * Free the space if the used size reaches acceptableSize() or one size block couldn't be 1110 * allocated. When freeing the space, we use the LRU algorithm and ensure there must be some 1111 * blocks evicted 1112 * @param why Why we are being called 1113 */ 1114 void freeSpace(final String why) { 1115 if (!isCacheInitialized("BucketCache::freeSpace")) { 1116 return; 1117 } 1118 // Ensure only one freeSpace progress at a time 1119 if (!freeSpaceLock.tryLock()) { 1120 return; 1121 } 1122 try { 1123 freeInProgress = true; 1124 StringBuilder msgBuffer = LOG.isDebugEnabled() ? new StringBuilder() : null; 1125 long bytesToFreeWithoutExtra = calculateBytesToFree(msgBuffer); 1126 if (bytesToFreeWithoutExtra <= 0) { 1127 return; 1128 } 1129 long currentSize = bucketAllocator.getUsedSize(); 1130 long totalSize = bucketAllocator.getTotalSize(); 1131 if (LOG.isDebugEnabled() && msgBuffer != null) { 1132 LOG.debug("Free started because \"" + why + "\"; " + msgBuffer + " of current used=" 1133 + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" 1134 + StringUtils.byteDesc(realCacheSize.sum()) + ", total=" 1135 + StringUtils.byteDesc(totalSize)); 1136 } 1137 long bytesToFreeWithExtra = 1138 (long) Math.floor(bytesToFreeWithoutExtra * (1 + extraFreeFactor)); 1139 // Instantiate priority buckets 1140 BucketEntryGroup bucketSingle = 1141 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(singleFactor)); 1142 BucketEntryGroup bucketMulti = 1143 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(multiFactor)); 1144 BucketEntryGroup bucketMemory = 1145 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(memoryFactor)); 1146 1147 Set<String> allValidFiles = null; 1148 // We need the region/stores/files tree, in order to figure out if a block is "orphan" or not. 1149 // See further comments below for more details. 1150 if (onlineRegions != null) { 1151 allValidFiles = BlockCacheUtil.listAllFilesNames(onlineRegions); 1152 } 1153 // the cached time is recored in nanos, so we need to convert the grace period accordingly 1154 long orphanGracePeriodNanos = orphanBlockGracePeriod * 1000000; 1155 long bytesFreed = 0; 1156 // Check the list of files to determine the cold files which can be readily evicted. 1157 Map<String, String> coldFiles = null; 1158 1159 DataTieringManager dataTieringManager = DataTieringManager.getInstance(); 1160 if (dataTieringManager != null) { 1161 coldFiles = dataTieringManager.getColdFilesList(); 1162 } 1163 // Scan entire map putting bucket entry into appropriate bucket entry 1164 // group 1165 for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) { 1166 BlockCacheKey key = bucketEntryWithKey.getKey(); 1167 BucketEntry entry = bucketEntryWithKey.getValue(); 1168 // Under certain conditions, blocks for regions not on the current region server might 1169 // be hanging on the cache. For example, when using the persistent cache feature, if the 1170 // RS crashes, then if not the same regions are assigned back once its online again, blocks 1171 // for the previous online regions would be recovered and stay in the cache. These would be 1172 // "orphan" blocks, as the files these blocks belong to are not in any of the online 1173 // regions. 1174 // "Orphan" blocks are a pure waste of cache space and should be evicted first during 1175 // the freespace run. 1176 // Compactions and Flushes may cache blocks before its files are completely written. In 1177 // these cases the file won't be found in any of the online regions stores, but the block 1178 // shouldn't be evicted. To avoid this, we defined this 1179 // hbase.bucketcache.block.orphan.evictgraceperiod property, to account for a grace 1180 // period (default 24 hours) where a block should be checked if it's an orphan block. 1181 if ( 1182 allValidFiles != null 1183 && entry.getCachedTime() < (System.nanoTime() - orphanGracePeriodNanos) 1184 ) { 1185 if (!allValidFiles.contains(key.getHfileName())) { 1186 if (evictBucketEntryIfNoRpcReferenced(key, entry)) { 1187 // We calculate the freed bytes, but we don't stop if the goal was reached because 1188 // these are orphan blocks anyway, so let's leverage this run of freeSpace 1189 // to get rid of all orphans at once. 1190 bytesFreed += entry.getLength(); 1191 continue; 1192 } 1193 } 1194 } 1195 1196 if ( 1197 bytesFreed < bytesToFreeWithExtra && coldFiles != null 1198 && coldFiles.containsKey(bucketEntryWithKey.getKey().getHfileName()) 1199 ) { 1200 int freedBlockSize = bucketEntryWithKey.getValue().getLength(); 1201 if (evictBlockIfNoRpcReferenced(bucketEntryWithKey.getKey())) { 1202 bytesFreed += freedBlockSize; 1203 } 1204 continue; 1205 } 1206 1207 switch (entry.getPriority()) { 1208 case SINGLE: { 1209 bucketSingle.add(bucketEntryWithKey); 1210 break; 1211 } 1212 case MULTI: { 1213 bucketMulti.add(bucketEntryWithKey); 1214 break; 1215 } 1216 case MEMORY: { 1217 bucketMemory.add(bucketEntryWithKey); 1218 break; 1219 } 1220 } 1221 } 1222 1223 // Check if the cold file eviction is sufficient to create enough space. 1224 bytesToFreeWithExtra -= bytesFreed; 1225 if (bytesToFreeWithExtra <= 0) { 1226 LOG.debug("Bucket cache free space completed; freed space : {} bytes of cold data blocks.", 1227 StringUtils.byteDesc(bytesFreed)); 1228 return; 1229 } 1230 1231 if (LOG.isDebugEnabled()) { 1232 LOG.debug( 1233 "Bucket cache free space completed; freed space : {} " 1234 + "bytes of cold data blocks. {} more bytes required to be freed.", 1235 StringUtils.byteDesc(bytesFreed), bytesToFreeWithExtra); 1236 } 1237 1238 PriorityQueue<BucketEntryGroup> bucketQueue = 1239 new PriorityQueue<>(3, Comparator.comparingLong(BucketEntryGroup::overflow)); 1240 1241 bucketQueue.add(bucketSingle); 1242 bucketQueue.add(bucketMulti); 1243 bucketQueue.add(bucketMemory); 1244 1245 int remainingBuckets = bucketQueue.size(); 1246 BucketEntryGroup bucketGroup; 1247 while ((bucketGroup = bucketQueue.poll()) != null) { 1248 long overflow = bucketGroup.overflow(); 1249 if (overflow > 0) { 1250 long bucketBytesToFree = 1251 Math.min(overflow, (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets); 1252 bytesFreed += bucketGroup.free(bucketBytesToFree); 1253 } 1254 remainingBuckets--; 1255 } 1256 1257 // Check and free if there are buckets that still need freeing of space 1258 if (bucketSizesAboveThresholdCount(minFactor) > 0) { 1259 bucketQueue.clear(); 1260 remainingBuckets = 3; 1261 bucketQueue.add(bucketSingle); 1262 bucketQueue.add(bucketMulti); 1263 bucketQueue.add(bucketMemory); 1264 while ((bucketGroup = bucketQueue.poll()) != null) { 1265 long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets; 1266 bytesFreed += bucketGroup.free(bucketBytesToFree); 1267 remainingBuckets--; 1268 } 1269 } 1270 // Even after the above free we might still need freeing because of the 1271 // De-fragmentation of the buckets (also called Slab Calcification problem), i.e 1272 // there might be some buckets where the occupancy is very sparse and thus are not 1273 // yielding the free for the other bucket sizes, the fix for this to evict some 1274 // of the buckets, we do this by evicting the buckets that are least fulled 1275 freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR * bucketSizesAboveThresholdCount(1.0f)); 1276 1277 if (LOG.isDebugEnabled()) { 1278 long single = bucketSingle.totalSize(); 1279 long multi = bucketMulti.totalSize(); 1280 long memory = bucketMemory.totalSize(); 1281 if (LOG.isDebugEnabled()) { 1282 LOG.debug("Bucket cache free space completed; " + "freed=" 1283 + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + StringUtils.byteDesc(totalSize) 1284 + ", " + "single=" + StringUtils.byteDesc(single) + ", " + "multi=" 1285 + StringUtils.byteDesc(multi) + ", " + "memory=" + StringUtils.byteDesc(memory)); 1286 } 1287 } 1288 } catch (Throwable t) { 1289 LOG.warn("Failed freeing space", t); 1290 } finally { 1291 cacheStats.evict(); 1292 freeInProgress = false; 1293 freeSpaceLock.unlock(); 1294 } 1295 } 1296 1297 // This handles flushing the RAM cache to IOEngine. 1298 class WriterThread extends Thread { 1299 private final BlockingQueue<RAMQueueEntry> inputQueue; 1300 private volatile boolean writerEnabled = true; 1301 private final ByteBuffer metaBuff = ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE); 1302 1303 WriterThread(BlockingQueue<RAMQueueEntry> queue) { 1304 super("BucketCacheWriterThread"); 1305 this.inputQueue = queue; 1306 } 1307 1308 // Used for test 1309 void disableWriter() { 1310 this.writerEnabled = false; 1311 } 1312 1313 @Override 1314 public void run() { 1315 List<RAMQueueEntry> entries = new ArrayList<>(); 1316 try { 1317 while (isCacheEnabled() && writerEnabled) { 1318 try { 1319 try { 1320 // Blocks 1321 entries = getRAMQueueEntries(inputQueue, entries); 1322 } catch (InterruptedException ie) { 1323 if (!isCacheEnabled() || !writerEnabled) { 1324 break; 1325 } 1326 } 1327 doDrain(entries, metaBuff); 1328 } catch (Exception ioe) { 1329 LOG.error("WriterThread encountered error", ioe); 1330 } 1331 } 1332 } catch (Throwable t) { 1333 LOG.warn("Failed doing drain", t); 1334 } 1335 LOG.info(this.getName() + " exiting, cacheEnabled=" + isCacheEnabled()); 1336 } 1337 } 1338 1339 /** 1340 * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing 1341 * cache with a new block for the same cache key. there's a corner case: one thread cache a block 1342 * in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another new block 1343 * with the same cache key do the same thing for the same cache key, so if not evict the previous 1344 * bucket entry, then memory leak happen because the previous bucketEntry is gone but the 1345 * bucketAllocator do not free its memory. 1346 * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey 1347 * cacheKey, Cacheable newBlock) 1348 * @param key Block cache key 1349 * @param bucketEntry Bucket entry to put into backingMap. 1350 */ 1351 protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { 1352 BucketEntry previousEntry = backingMap.put(key, bucketEntry); 1353 blocksByHFile.add(key); 1354 updateRegionCachedSize(key, bucketEntry.getLength()); 1355 if (previousEntry != null && previousEntry != bucketEntry) { 1356 previousEntry.withWriteLock(offsetLock, () -> { 1357 blockEvicted(key, previousEntry, false, false); 1358 return null; 1359 }); 1360 } 1361 } 1362 1363 /** 1364 * Prepare and return a warning message for Bucket Allocator Exception 1365 * @param fle The exception 1366 * @param re The RAMQueueEntry for which the exception was thrown. 1367 * @return A warning message created from the input RAMQueueEntry object. 1368 */ 1369 private static String getAllocationFailWarningMessage(final BucketAllocatorException fle, 1370 final RAMQueueEntry re) { 1371 final StringBuilder sb = new StringBuilder(); 1372 sb.append("Most recent failed allocation after "); 1373 sb.append(ALLOCATION_FAIL_LOG_TIME_PERIOD); 1374 sb.append(" ms;"); 1375 if (re != null) { 1376 if (re.getData() instanceof HFileBlock) { 1377 final HFileContext fileContext = ((HFileBlock) re.getData()).getHFileContext(); 1378 final String columnFamily = Bytes.toString(fileContext.getColumnFamily()); 1379 final String tableName = Bytes.toString(fileContext.getTableName()); 1380 if (tableName != null) { 1381 sb.append(" Table: "); 1382 sb.append(tableName); 1383 } 1384 if (columnFamily != null) { 1385 sb.append(" CF: "); 1386 sb.append(columnFamily); 1387 } 1388 sb.append(" HFile: "); 1389 if (fileContext.getHFileName() != null) { 1390 sb.append(fileContext.getHFileName()); 1391 } else { 1392 sb.append(re.getKey()); 1393 } 1394 } else { 1395 sb.append(" HFile: "); 1396 sb.append(re.getKey()); 1397 } 1398 } 1399 sb.append(" Message: "); 1400 sb.append(fle.getMessage()); 1401 return sb.toString(); 1402 } 1403 1404 /** 1405 * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. Process all that 1406 * are passed in even if failure being sure to remove from ramCache else we'll never undo the 1407 * references and we'll OOME. 1408 * @param entries Presumes list passed in here will be processed by this invocation only. No 1409 * interference expected. 1410 */ 1411 void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException { 1412 if (entries.isEmpty()) { 1413 return; 1414 } 1415 // This method is a little hard to follow. We run through the passed in entries and for each 1416 // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must 1417 // do cleanup making sure we've cleared ramCache of all entries regardless of whether we 1418 // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by 1419 // filling ramCache. We do the clean up by again running through the passed in entries 1420 // doing extra work when we find a non-null bucketEntries corresponding entry. 1421 final int size = entries.size(); 1422 BucketEntry[] bucketEntries = new BucketEntry[size]; 1423 // Index updated inside loop if success or if we can't succeed. We retry if cache is full 1424 // when we go to add an entry by going around the loop again without upping the index. 1425 int index = 0; 1426 while (isCacheEnabled() && index < size) { 1427 RAMQueueEntry re = null; 1428 try { 1429 re = entries.get(index); 1430 if (re == null) { 1431 LOG.warn("Couldn't get entry or changed on us; who else is messing with it?"); 1432 index++; 1433 continue; 1434 } 1435 // Reset the position for reuse. 1436 // It should be guaranteed that the data in the metaBuff has been transferred to the 1437 // ioEngine safely. Otherwise, this reuse is problematic. Fortunately, the data is already 1438 // transferred with our current IOEngines. Should take care, when we have new kinds of 1439 // IOEngine in the future. 1440 metaBuff.clear(); 1441 BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize, 1442 this::createRecycler, metaBuff, acceptableSize()); 1443 // Successfully added. Up index and add bucketEntry. Clear io exceptions. 1444 bucketEntries[index] = bucketEntry; 1445 if (ioErrorStartTime > 0) { 1446 ioErrorStartTime = -1; 1447 } 1448 index++; 1449 } catch (BucketAllocatorException fle) { 1450 long currTs = EnvironmentEdgeManager.currentTime(); 1451 cacheStats.allocationFailed(); // Record the warning. 1452 if ( 1453 allocFailLogPrevTs == 0 || (currTs - allocFailLogPrevTs) > ALLOCATION_FAIL_LOG_TIME_PERIOD 1454 ) { 1455 LOG.warn(getAllocationFailWarningMessage(fle, re)); 1456 allocFailLogPrevTs = currTs; 1457 } 1458 // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below. 1459 bucketEntries[index] = null; 1460 index++; 1461 } catch (CacheFullException cfe) { 1462 // Cache full when we tried to add. Try freeing space and then retrying (don't up index) 1463 if (!freeInProgress && !re.isPrefetch()) { 1464 freeSpace("Full!"); 1465 } else if (re.isPrefetch()) { 1466 bucketEntries[index] = null; 1467 index++; 1468 } else { 1469 Thread.sleep(50); 1470 } 1471 } catch (IOException ioex) { 1472 // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem. 1473 LOG.error("Failed writing to bucket cache", ioex); 1474 checkIOErrorIsTolerated(); 1475 } 1476 } 1477 1478 // Make sure data pages are written on media before we update maps. 1479 try { 1480 ioEngine.sync(); 1481 } catch (IOException ioex) { 1482 LOG.error("Failed syncing IO engine", ioex); 1483 checkIOErrorIsTolerated(); 1484 // Since we failed sync, free the blocks in bucket allocator 1485 for (int i = 0; i < entries.size(); ++i) { 1486 BucketEntry bucketEntry = bucketEntries[i]; 1487 if (bucketEntry != null) { 1488 bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength()); 1489 bucketEntries[i] = null; 1490 } 1491 } 1492 } 1493 1494 // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if 1495 // success or error. 1496 for (int i = 0; i < size; ++i) { 1497 BlockCacheKey key = entries.get(i).getKey(); 1498 // Only add if non-null entry. 1499 if (bucketEntries[i] != null) { 1500 putIntoBackingMap(key, bucketEntries[i]); 1501 if (ioEngine.isPersistent()) { 1502 setCacheInconsistent(true); 1503 } 1504 } 1505 // Always remove from ramCache even if we failed adding it to the block cache above. 1506 boolean existed = ramCache.remove(key, re -> { 1507 if (re != null) { 1508 heapSize.add(-1 * re.getData().heapSize()); 1509 } 1510 }); 1511 if (!existed && bucketEntries[i] != null) { 1512 // Block should have already been evicted. Remove it and free space. 1513 final BucketEntry bucketEntry = bucketEntries[i]; 1514 bucketEntry.withWriteLock(offsetLock, () -> { 1515 if (backingMap.remove(key, bucketEntry)) { 1516 blockEvicted(key, bucketEntry, false, false); 1517 } 1518 return null; 1519 }); 1520 } 1521 long used = bucketAllocator.getUsedSize(); 1522 if (!entries.get(i).isPrefetch() && used > acceptableSize()) { 1523 LOG.debug("Calling freeSpace for block: {}", entries.get(i).getKey()); 1524 freeSpace("Used=" + used + " > acceptable=" + acceptableSize()); 1525 } 1526 } 1527 1528 } 1529 1530 /** 1531 * Blocks until elements available in {@code q} then tries to grab as many as possible before 1532 * returning. 1533 * @param receptacle Where to stash the elements taken from queue. We clear before we use it just 1534 * in case. 1535 * @param q The queue to take from. 1536 * @return {@code receptacle} laden with elements taken from the queue or empty if none found. 1537 */ 1538 static List<RAMQueueEntry> getRAMQueueEntries(BlockingQueue<RAMQueueEntry> q, 1539 List<RAMQueueEntry> receptacle) throws InterruptedException { 1540 // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it 1541 // ok even if list grew to accommodate thousands. 1542 receptacle.clear(); 1543 receptacle.add(q.take()); 1544 q.drainTo(receptacle); 1545 return receptacle; 1546 } 1547 1548 /** 1549 * @see #retrieveFromFile(int[]) 1550 */ 1551 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION", 1552 justification = "false positive, try-with-resources ensures close is called.") 1553 void persistToFile() throws IOException { 1554 LOG.debug("Thread {} started persisting bucket cache to file", 1555 Thread.currentThread().getName()); 1556 if (!isCachePersistent()) { 1557 throw new IOException("Attempt to persist non-persistent cache mappings!"); 1558 } 1559 File tempPersistencePath = new File(persistencePath + EnvironmentEdgeManager.currentTime()); 1560 try (FileOutputStream fos = new FileOutputStream(tempPersistencePath, false)) { 1561 LOG.debug("Persist in new chunked persistence format."); 1562 1563 persistChunkedBackingMap(fos); 1564 1565 LOG.debug( 1566 "PersistToFile: after persisting backing map size: {}, fullycachedFiles size: {}," 1567 + " file name: {}", 1568 backingMap.size(), fullyCachedFiles.size(), tempPersistencePath.getName()); 1569 } catch (IOException e) { 1570 LOG.error("Failed to persist bucket cache to file", e); 1571 throw e; 1572 } catch (Throwable e) { 1573 LOG.error("Failed during persist bucket cache to file: ", e); 1574 throw e; 1575 } 1576 LOG.debug("Thread {} finished persisting bucket cache to file, renaming", 1577 Thread.currentThread().getName()); 1578 if (!tempPersistencePath.renameTo(new File(persistencePath))) { 1579 LOG.warn("Failed to commit cache persistent file. We might lose cached blocks if " 1580 + "RS crashes/restarts before we successfully checkpoint again."); 1581 } 1582 } 1583 1584 public boolean isCachePersistent() { 1585 return ioEngine.isPersistent() && persistencePath != null; 1586 } 1587 1588 @Override 1589 public Optional<Map<String, Long>> getRegionCachedInfo() { 1590 return Optional.of(Collections.unmodifiableMap(regionCachedSize)); 1591 } 1592 1593 /** 1594 * @see #persistToFile() 1595 */ 1596 private void retrieveFromFile(int[] bucketSizes) throws IOException { 1597 LOG.info("Started retrieving bucket cache from file"); 1598 File persistenceFile = new File(persistencePath); 1599 if (!persistenceFile.exists()) { 1600 LOG.warn("Persistence file missing! " 1601 + "It's ok if it's first run after enabling persistent cache."); 1602 bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize); 1603 blockNumber.add(backingMap.size()); 1604 backingMapValidated.set(true); 1605 return; 1606 } 1607 assert !isCacheEnabled(); 1608 1609 try (FileInputStream in = new FileInputStream(persistenceFile)) { 1610 int pblen = ProtobufMagic.lengthOfPBMagic(); 1611 byte[] pbuf = new byte[pblen]; 1612 IOUtils.readFully(in, pbuf, 0, pblen); 1613 1614 if (ProtobufMagic.isPBMagicPrefix(pbuf)) { 1615 LOG.info("Reading old format of persistence."); 1616 // The old non-chunked version of backing map persistence. 1617 BucketCacheProtos.BucketCacheEntry cacheEntry = 1618 BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in); 1619 if (cacheEntry == null) { 1620 throw new IOException( 1621 "Failed to parse cache entry from persistence file: " + persistencePath); 1622 } 1623 parsePB(cacheEntry); 1624 } else if (Arrays.equals(pbuf, BucketProtoUtils.PB_MAGIC_V2)) { 1625 // The new persistence format of chunked persistence. 1626 LOG.info("Reading new chunked format of persistence."); 1627 retrieveChunkedBackingMap(in); 1628 } else { 1629 // In 3.0 we have enough flexibility to dump the old cache data. 1630 // TODO: In 2.x line, this might need to be filled in to support reading the old format 1631 throw new IOException( 1632 "Persistence file does not start with protobuf magic number. " + persistencePath); 1633 } 1634 bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize); 1635 blockNumber.add(backingMap.size()); 1636 LOG.info("Bucket cache retrieved from file successfully with size: {}", backingMap.size()); 1637 } 1638 } 1639 1640 private void updateRegionSizeMapWhileRetrievingFromFile() { 1641 // Update the regionCachedSize with the region size while restarting the region server 1642 if (LOG.isDebugEnabled()) { 1643 LOG.debug("Updating region size map after retrieving cached file list"); 1644 dumpPrefetchList(); 1645 } 1646 regionCachedSize.clear(); 1647 backingMap.forEach((k, v) -> updateRegionCachedSize(k, v.getLength())); 1648 } 1649 1650 private void dumpPrefetchList() { 1651 for (Map.Entry<String, Pair<String, Long>> outerEntry : fullyCachedFiles.entrySet()) { 1652 LOG.debug("Cached File Entry:<{},<{},{}>>", outerEntry.getKey(), 1653 outerEntry.getValue().getFirst(), outerEntry.getValue().getSecond()); 1654 } 1655 } 1656 1657 private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass) 1658 throws IOException { 1659 if (capacitySize != cacheCapacity) { 1660 throw new IOException("Mismatched cache capacity:" + StringUtils.byteDesc(capacitySize) 1661 + ", expected: " + StringUtils.byteDesc(cacheCapacity)); 1662 } 1663 if (!ioEngine.getClass().getName().equals(ioclass)) { 1664 throw new IOException("Class name for IO engine mismatch: " + ioclass + ", expected:" 1665 + ioEngine.getClass().getName()); 1666 } 1667 if (!backingMap.getClass().getName().equals(mapclass)) { 1668 throw new IOException("Class name for cache map mismatch: " + mapclass + ", expected:" 1669 + backingMap.getClass().getName()); 1670 } 1671 } 1672 1673 private void verifyFileIntegrity(BucketCacheProtos.BucketCacheEntry proto) { 1674 try { 1675 if (proto.hasChecksum()) { 1676 ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(), 1677 algorithm); 1678 } 1679 backingMapValidated.set(true); 1680 } catch (IOException e) { 1681 LOG.warn("Checksum for cache file failed. " 1682 + "We need to validate each cache key in the backing map. " 1683 + "This may take some time, so we'll do it in a background thread,"); 1684 1685 Runnable cacheValidator = () -> { 1686 while (bucketAllocator == null) { 1687 try { 1688 Thread.sleep(50); 1689 } catch (InterruptedException ex) { 1690 throw new RuntimeException(ex); 1691 } 1692 } 1693 long startTime = EnvironmentEdgeManager.currentTime(); 1694 int totalKeysOriginally = backingMap.size(); 1695 for (Map.Entry<BlockCacheKey, BucketEntry> keyEntry : backingMap.entrySet()) { 1696 try { 1697 ((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue()); 1698 } catch (IOException e1) { 1699 LOG.debug("Check for key {} failed. Evicting.", keyEntry.getKey()); 1700 evictBlock(keyEntry.getKey()); 1701 fileNotFullyCached(keyEntry.getKey(), keyEntry.getValue()); 1702 } 1703 } 1704 backingMapValidated.set(true); 1705 LOG.info("Finished validating {} keys in the backing map. Recovered: {}. This took {}ms.", 1706 totalKeysOriginally, backingMap.size(), 1707 (EnvironmentEdgeManager.currentTime() - startTime)); 1708 }; 1709 Thread t = new Thread(cacheValidator); 1710 t.setDaemon(true); 1711 t.start(); 1712 } 1713 } 1714 1715 private void updateCacheIndex(BucketCacheProtos.BackingMap chunk, 1716 java.util.Map<java.lang.Integer, java.lang.String> deserializer) throws IOException { 1717 Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair2 = 1718 BucketProtoUtils.fromPB(deserializer, chunk, this::createRecycler); 1719 pair2.getFirst().forEach((k, v) -> { 1720 backingMap.put(k, v); 1721 updateRegionCachedSize(k, v.getLength()); 1722 }); 1723 blocksByHFile.addAll(pair2.getSecond()); 1724 } 1725 1726 private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException { 1727 Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair = 1728 BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(), 1729 this::createRecycler); 1730 backingMap = pair.getFirst(); 1731 blocksByHFile = pair.getSecond(); 1732 fullyCachedFiles.clear(); 1733 fullyCachedFiles.putAll(BucketProtoUtils.fromPB(proto.getCachedFilesMap())); 1734 1735 LOG.info("After retrieval Backing map size: {}, fullyCachedFiles size: {}", backingMap.size(), 1736 fullyCachedFiles.size()); 1737 1738 verifyFileIntegrity(proto); 1739 updateRegionSizeMapWhileRetrievingFromFile(); 1740 verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass()); 1741 } 1742 1743 private void persistChunkedBackingMap(FileOutputStream fos) throws IOException { 1744 LOG.debug( 1745 "persistToFile: before persisting backing map size: {}, " 1746 + "fullycachedFiles size: {}, chunkSize: {}", 1747 backingMap.size(), fullyCachedFiles.size(), persistenceChunkSize); 1748 1749 BucketProtoUtils.serializeAsPB(this, fos, persistenceChunkSize); 1750 1751 LOG.debug( 1752 "persistToFile: after persisting backing map size: {}, " + "fullycachedFiles size: {}", 1753 backingMap.size(), fullyCachedFiles.size()); 1754 } 1755 1756 private void retrieveChunkedBackingMap(FileInputStream in) throws IOException { 1757 // Read the first chunk that has all the details. 1758 BucketCacheProtos.BucketCacheEntry cacheEntry = 1759 BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in); 1760 1761 // HBASE-29857: Handle case where persistence file is empty. 1762 // parseDelimitedFrom() returns null when there's no data to read. 1763 // Note: Corrupted files would throw InvalidProtocolBufferException (subclass of IOException). 1764 if (cacheEntry == null) { 1765 throw new IOException("Failed to read cache entry from persistence file (file is empty)"); 1766 } 1767 1768 fullyCachedFiles.clear(); 1769 fullyCachedFiles.putAll(BucketProtoUtils.fromPB(cacheEntry.getCachedFilesMap())); 1770 1771 backingMap.clear(); 1772 blocksByHFile.clear(); 1773 regionCachedSize.clear(); 1774 1775 // Read the backing map entries in batches. 1776 int numChunks = 0; 1777 while (in.available() > 0) { 1778 updateCacheIndex(BucketCacheProtos.BackingMap.parseDelimitedFrom(in), 1779 cacheEntry.getDeserializersMap()); 1780 numChunks++; 1781 } 1782 1783 LOG.info("Retrieved {} of chunks with blockCount = {}.", numChunks, backingMap.size()); 1784 verifyFileIntegrity(cacheEntry); 1785 verifyCapacityAndClasses(cacheEntry.getCacheCapacity(), cacheEntry.getIoClass(), 1786 cacheEntry.getMapClass()); 1787 } 1788 1789 /** 1790 * Check whether we tolerate IO error this time. If the duration of IOEngine throwing errors 1791 * exceeds ioErrorsDurationTimeTolerated, we will disable the cache 1792 */ 1793 private void checkIOErrorIsTolerated() { 1794 long now = EnvironmentEdgeManager.currentTime(); 1795 // Do a single read to a local variable to avoid timing issue - HBASE-24454 1796 long ioErrorStartTimeTmp = this.ioErrorStartTime; 1797 if (ioErrorStartTimeTmp > 0) { 1798 if (isCacheEnabled() && (now - ioErrorStartTimeTmp) > this.ioErrorsTolerationDuration) { 1799 LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration 1800 + "ms, disabling cache, please check your IOEngine"); 1801 disableCache(); 1802 } 1803 } else { 1804 this.ioErrorStartTime = now; 1805 } 1806 } 1807 1808 /** 1809 * Used to shut down the cache -or- turn it off in the case of something broken. 1810 */ 1811 private void disableCache() { 1812 if (!isCacheEnabled()) { 1813 return; 1814 } 1815 LOG.info("Disabling cache"); 1816 cacheState = CacheState.DISABLED; 1817 ioEngine.shutdown(); 1818 this.scheduleThreadPool.shutdown(); 1819 for (int i = 0; i < writerThreads.length; ++i) 1820 writerThreads[i].interrupt(); 1821 this.ramCache.clear(); 1822 if (!ioEngine.isPersistent() || persistencePath == null) { 1823 // If persistent ioengine and a path, we will serialize out the backingMap. 1824 this.backingMap.clear(); 1825 this.blocksByHFile.clear(); 1826 this.fullyCachedFiles.clear(); 1827 this.regionCachedSize.clear(); 1828 } 1829 if (cacheStats.getMetricsRollerScheduler() != null) { 1830 cacheStats.getMetricsRollerScheduler().shutdownNow(); 1831 } 1832 } 1833 1834 private void join() throws InterruptedException { 1835 for (int i = 0; i < writerThreads.length; ++i) 1836 writerThreads[i].join(); 1837 } 1838 1839 @Override 1840 public void shutdown() { 1841 if (isCacheEnabled()) { 1842 disableCache(); 1843 LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() 1844 + "; path to write=" + persistencePath); 1845 if (ioEngine.isPersistent() && persistencePath != null) { 1846 try { 1847 join(); 1848 if (cachePersister != null) { 1849 LOG.info("Shutting down cache persister thread."); 1850 cachePersister.shutdown(); 1851 while (cachePersister.isAlive()) { 1852 Thread.sleep(10); 1853 } 1854 } 1855 persistToFile(); 1856 } catch (IOException ex) { 1857 LOG.error("Unable to persist data on exit: " + ex.toString(), ex); 1858 } catch (InterruptedException e) { 1859 LOG.warn("Failed to persist data on exit", e); 1860 } 1861 } 1862 } 1863 } 1864 1865 /** 1866 * Needed mostly for UTs that might run in the same VM and create different BucketCache instances 1867 * on different UT methods. 1868 */ 1869 @Override 1870 protected void finalize() { 1871 if (cachePersister != null && !cachePersister.isInterrupted()) { 1872 cachePersister.interrupt(); 1873 } 1874 } 1875 1876 @Override 1877 public CacheStats getStats() { 1878 return cacheStats; 1879 } 1880 1881 public BucketAllocator getAllocator() { 1882 return this.bucketAllocator; 1883 } 1884 1885 @Override 1886 public long heapSize() { 1887 return this.heapSize.sum(); 1888 } 1889 1890 @Override 1891 public long size() { 1892 return this.realCacheSize.sum(); 1893 } 1894 1895 @Override 1896 public long getCurrentDataSize() { 1897 return size(); 1898 } 1899 1900 @Override 1901 public long getFreeSize() { 1902 if (!isCacheInitialized("BucketCache:getFreeSize")) { 1903 return 0; 1904 } 1905 return this.bucketAllocator.getFreeSize(); 1906 } 1907 1908 @Override 1909 public long getBlockCount() { 1910 return this.blockNumber.sum(); 1911 } 1912 1913 @Override 1914 public long getDataBlockCount() { 1915 return getBlockCount(); 1916 } 1917 1918 @Override 1919 public long getCurrentSize() { 1920 if (!isCacheInitialized("BucketCache::getCurrentSize")) { 1921 return 0; 1922 } 1923 return this.bucketAllocator.getUsedSize(); 1924 } 1925 1926 protected String getAlgorithm() { 1927 return algorithm; 1928 } 1929 1930 /** 1931 * Evicts all blocks for a specific HFile. 1932 * <p> 1933 * This is used for evict-on-close to remove all blocks of a specific HFile. 1934 * @return the number of blocks evicted 1935 */ 1936 @Override 1937 public int evictBlocksByHfileName(String hfileName) { 1938 return evictBlocksRangeByHfileName(hfileName, 0, Long.MAX_VALUE); 1939 } 1940 1941 @Override 1942 public int evictBlocksRangeByHfileName(String hfileName, long initOffset, long endOffset) { 1943 Set<BlockCacheKey> keySet = getAllCacheKeysForFile(hfileName, initOffset, endOffset); 1944 // We need to make sure whether we are evicting all blocks for this given file. In case of 1945 // split references, we might be evicting just half of the blocks 1946 LOG.debug("found {} blocks for file {}, starting offset: {}, end offset: {}", keySet.size(), 1947 hfileName, initOffset, endOffset); 1948 return evictBlockSet(keySet); 1949 } 1950 1951 private int evictBlockSet(Set<BlockCacheKey> keySet) { 1952 int numEvicted = 0; 1953 for (BlockCacheKey key : keySet) { 1954 if (evictBlock(key)) { 1955 ++numEvicted; 1956 } 1957 } 1958 return numEvicted; 1959 } 1960 1961 private Set<BlockCacheKey> getAllCacheKeysForFile(String hfileName, long init, long end) { 1962 Set<BlockCacheKey> cacheKeys = new HashSet<>(); 1963 // At this moment, Some Bucket Entries may be in the WriterThread queue, and not yet put into 1964 // the backingMap. So, when executing this method, we should check both the RAMCache and 1965 // backingMap to ensure all CacheKeys are obtained. 1966 // For more details, please refer to HBASE-29862. 1967 Set<BlockCacheKey> ramCacheKeySet = ramCache.getRamBlockCacheKeysForHFile(hfileName); 1968 for (BlockCacheKey key : ramCacheKeySet) { 1969 if (key.getOffset() >= init && key.getOffset() <= end) { 1970 cacheKeys.add(key); 1971 } 1972 } 1973 1974 // These keys are just for comparison and are short lived, so we need only file name and offset 1975 cacheKeys.addAll(blocksByHFile.subSet(new BlockCacheKey(hfileName, init), true, 1976 new BlockCacheKey(hfileName, end), true)); 1977 return cacheKeys; 1978 } 1979 1980 /** 1981 * Used to group bucket entries into priority buckets. There will be a BucketEntryGroup for each 1982 * priority (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate 1983 * number of elements out of each according to configuration parameters and their relative sizes. 1984 */ 1985 private class BucketEntryGroup { 1986 1987 private CachedEntryQueue queue; 1988 private long totalSize = 0; 1989 private long bucketSize; 1990 1991 public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) { 1992 this.bucketSize = bucketSize; 1993 queue = new CachedEntryQueue(bytesToFree, blockSize); 1994 totalSize = 0; 1995 } 1996 1997 public void add(Map.Entry<BlockCacheKey, BucketEntry> block) { 1998 totalSize += block.getValue().getLength(); 1999 queue.add(block); 2000 } 2001 2002 public long free(long toFree) { 2003 Map.Entry<BlockCacheKey, BucketEntry> entry; 2004 long freedBytes = 0; 2005 // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free 2006 // What to do then? Caching attempt fail? Need some changes in cacheBlock API? 2007 while ((entry = queue.pollLast()) != null) { 2008 BlockCacheKey blockCacheKey = entry.getKey(); 2009 BucketEntry be = entry.getValue(); 2010 if (evictBucketEntryIfNoRpcReferenced(blockCacheKey, be)) { 2011 freedBytes += be.getLength(); 2012 } 2013 if (freedBytes >= toFree) { 2014 return freedBytes; 2015 } 2016 } 2017 return freedBytes; 2018 } 2019 2020 public long overflow() { 2021 return totalSize - bucketSize; 2022 } 2023 2024 public long totalSize() { 2025 return totalSize; 2026 } 2027 } 2028 2029 /** 2030 * Block Entry stored in the memory with key,data and so on 2031 */ 2032 static class RAMQueueEntry { 2033 private final BlockCacheKey key; 2034 private final Cacheable data; 2035 private long accessCounter; 2036 private boolean inMemory; 2037 private boolean isCachePersistent; 2038 2039 private boolean isPrefetch; 2040 2041 RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory, 2042 boolean isCachePersistent, boolean isPrefetch) { 2043 this.key = bck; 2044 this.data = data; 2045 this.accessCounter = accessCounter; 2046 this.inMemory = inMemory; 2047 this.isCachePersistent = isCachePersistent; 2048 this.isPrefetch = isPrefetch; 2049 } 2050 2051 public Cacheable getData() { 2052 return data; 2053 } 2054 2055 public BlockCacheKey getKey() { 2056 return key; 2057 } 2058 2059 public boolean isPrefetch() { 2060 return isPrefetch; 2061 } 2062 2063 public void access(long accessCounter) { 2064 this.accessCounter = accessCounter; 2065 } 2066 2067 private ByteBuffAllocator getByteBuffAllocator() { 2068 if (data instanceof HFileBlock) { 2069 return ((HFileBlock) data).getByteBuffAllocator(); 2070 } 2071 return ByteBuffAllocator.HEAP; 2072 } 2073 2074 public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc, 2075 final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler, 2076 ByteBuffer metaBuff, final Long acceptableSize) throws IOException { 2077 int len = data.getSerializedLength(); 2078 // This cacheable thing can't be serialized 2079 if (len == 0) { 2080 return null; 2081 } 2082 if (isCachePersistent && data instanceof HFileBlock) { 2083 len += Long.BYTES; // we need to record the cache time for consistency check in case of 2084 // recovery 2085 } 2086 long offset = alloc.allocateBlock(len); 2087 // In the case of prefetch, we want to avoid freeSpace runs when the cache is full. 2088 // this makes the cache allocation more predictable, and is particularly important 2089 // when persistent cache is enabled, as it won't trigger evictions of the recovered blocks, 2090 // which are likely the most accessed and relevant blocks in the cache. 2091 if (isPrefetch() && alloc.getUsedSize() > acceptableSize) { 2092 alloc.freeBlock(offset, len); 2093 return null; 2094 } 2095 boolean succ = false; 2096 BucketEntry bucketEntry = null; 2097 try { 2098 int diskSizeWithHeader = (data instanceof HFileBlock) 2099 ? ((HFileBlock) data).getOnDiskSizeWithHeader() 2100 : data.getSerializedLength(); 2101 bucketEntry = new BucketEntry(offset, len, diskSizeWithHeader, accessCounter, inMemory, 2102 createRecycler, getByteBuffAllocator()); 2103 bucketEntry.setDeserializerReference(data.getDeserializer()); 2104 if (data instanceof HFileBlock) { 2105 // If an instance of HFileBlock, save on some allocations. 2106 HFileBlock block = (HFileBlock) data; 2107 ByteBuff sliceBuf = block.getBufferReadOnly(); 2108 block.getMetaData(metaBuff); 2109 // adds the cache time prior to the block and metadata part 2110 if (isCachePersistent) { 2111 ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); 2112 buffer.putLong(bucketEntry.getCachedTime()); 2113 buffer.rewind(); 2114 ioEngine.write(buffer, offset); 2115 ioEngine.write(sliceBuf, (offset + Long.BYTES)); 2116 } else { 2117 ioEngine.write(sliceBuf, offset); 2118 } 2119 ioEngine.write(metaBuff, offset + len - metaBuff.limit()); 2120 } else { 2121 // Only used for testing. 2122 ByteBuffer bb = ByteBuffer.allocate(len); 2123 data.serialize(bb, true); 2124 ioEngine.write(bb, offset); 2125 } 2126 succ = true; 2127 } finally { 2128 if (!succ) { 2129 alloc.freeBlock(offset, len); 2130 } 2131 } 2132 realCacheSize.add(len); 2133 return bucketEntry; 2134 } 2135 } 2136 2137 /** 2138 * Only used in test 2139 */ 2140 void stopWriterThreads() throws InterruptedException { 2141 for (WriterThread writerThread : writerThreads) { 2142 writerThread.disableWriter(); 2143 writerThread.interrupt(); 2144 writerThread.join(); 2145 } 2146 } 2147 2148 @Override 2149 public Iterator<CachedBlock> iterator() { 2150 // Don't bother with ramcache since stuff is in here only a little while. 2151 final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i = this.backingMap.entrySet().iterator(); 2152 return new Iterator<CachedBlock>() { 2153 private final long now = System.nanoTime(); 2154 2155 @Override 2156 public boolean hasNext() { 2157 return i.hasNext(); 2158 } 2159 2160 @Override 2161 public CachedBlock next() { 2162 final Map.Entry<BlockCacheKey, BucketEntry> e = i.next(); 2163 return new CachedBlock() { 2164 @Override 2165 public String toString() { 2166 return BlockCacheUtil.toString(this, now); 2167 } 2168 2169 @Override 2170 public BlockPriority getBlockPriority() { 2171 return e.getValue().getPriority(); 2172 } 2173 2174 @Override 2175 public BlockType getBlockType() { 2176 // Not held by BucketEntry. Could add it if wanted on BucketEntry creation. 2177 return null; 2178 } 2179 2180 @Override 2181 public long getOffset() { 2182 return e.getKey().getOffset(); 2183 } 2184 2185 @Override 2186 public long getSize() { 2187 return e.getValue().getLength(); 2188 } 2189 2190 @Override 2191 public long getCachedTime() { 2192 return e.getValue().getCachedTime(); 2193 } 2194 2195 @Override 2196 public String getFilename() { 2197 return e.getKey().getHfileName(); 2198 } 2199 2200 @Override 2201 public int compareTo(CachedBlock other) { 2202 int diff = this.getFilename().compareTo(other.getFilename()); 2203 if (diff != 0) return diff; 2204 2205 diff = Long.compare(this.getOffset(), other.getOffset()); 2206 if (diff != 0) return diff; 2207 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) { 2208 throw new IllegalStateException( 2209 "" + this.getCachedTime() + ", " + other.getCachedTime()); 2210 } 2211 return Long.compare(other.getCachedTime(), this.getCachedTime()); 2212 } 2213 2214 @Override 2215 public int hashCode() { 2216 return e.getKey().hashCode(); 2217 } 2218 2219 @Override 2220 public boolean equals(Object obj) { 2221 if (obj instanceof CachedBlock) { 2222 CachedBlock cb = (CachedBlock) obj; 2223 return compareTo(cb) == 0; 2224 } else { 2225 return false; 2226 } 2227 } 2228 }; 2229 } 2230 2231 @Override 2232 public void remove() { 2233 throw new UnsupportedOperationException(); 2234 } 2235 }; 2236 } 2237 2238 @Override 2239 public BlockCache[] getBlockCaches() { 2240 return null; 2241 } 2242 2243 public int getRpcRefCount(BlockCacheKey cacheKey) { 2244 BucketEntry bucketEntry = backingMap.get(cacheKey); 2245 if (bucketEntry != null) { 2246 return bucketEntry.refCnt() - (bucketEntry.markedAsEvicted.get() ? 0 : 1); 2247 } 2248 return 0; 2249 } 2250 2251 float getAcceptableFactor() { 2252 return acceptableFactor; 2253 } 2254 2255 float getMinFactor() { 2256 return minFactor; 2257 } 2258 2259 float getExtraFreeFactor() { 2260 return extraFreeFactor; 2261 } 2262 2263 float getSingleFactor() { 2264 return singleFactor; 2265 } 2266 2267 float getMultiFactor() { 2268 return multiFactor; 2269 } 2270 2271 float getMemoryFactor() { 2272 return memoryFactor; 2273 } 2274 2275 long getQueueAdditionWaitTime() { 2276 return queueAdditionWaitTime; 2277 } 2278 2279 long getPersistenceChunkSize() { 2280 return persistenceChunkSize; 2281 } 2282 2283 long getBucketcachePersistInterval() { 2284 return bucketcachePersistInterval; 2285 } 2286 2287 public String getPersistencePath() { 2288 return persistencePath; 2289 } 2290 2291 /** 2292 * Wrapped the delegate ConcurrentMap with maintaining its block's reference count. 2293 */ 2294 static class RAMCache { 2295 /** 2296 * Defined the map as {@link ConcurrentHashMap} explicitly here, because in 2297 * {@link RAMCache#get(BlockCacheKey)} and 2298 * {@link RAMCache#putIfAbsent(BlockCacheKey, BucketCache.RAMQueueEntry)} , we need to guarantee 2299 * the atomicity of map#computeIfPresent(key, func) and map#putIfAbsent(key, func). Besides, the 2300 * func method can execute exactly once only when the key is present(or absent) and under the 2301 * lock context. Otherwise, the reference count of block will be messed up. Notice that the 2302 * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that. 2303 */ 2304 final ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> delegate = new ConcurrentHashMap<>(); 2305 2306 public boolean containsKey(BlockCacheKey key) { 2307 return delegate.containsKey(key); 2308 } 2309 2310 public RAMQueueEntry get(BlockCacheKey key) { 2311 return delegate.computeIfPresent(key, (k, re) -> { 2312 // It'll be referenced by RPC, so retain atomically here. if the get and retain is not 2313 // atomic, another thread may remove and release the block, when retaining in this thread we 2314 // may retain a block with refCnt=0 which is disallowed. (see HBASE-22422) 2315 re.getData().retain(); 2316 return re; 2317 }); 2318 } 2319 2320 /** 2321 * Return the previous associated value, or null if absent. It has the same meaning as 2322 * {@link ConcurrentMap#putIfAbsent(Object, Object)} 2323 */ 2324 public RAMQueueEntry putIfAbsent(BlockCacheKey key, RAMQueueEntry entry) { 2325 AtomicBoolean absent = new AtomicBoolean(false); 2326 RAMQueueEntry re = delegate.computeIfAbsent(key, k -> { 2327 // The RAMCache reference to this entry, so reference count should be increment. 2328 entry.getData().retain(); 2329 absent.set(true); 2330 return entry; 2331 }); 2332 return absent.get() ? null : re; 2333 } 2334 2335 public boolean remove(BlockCacheKey key) { 2336 return remove(key, re -> { 2337 }); 2338 } 2339 2340 /** 2341 * Defined an {@link Consumer} here, because once the removed entry release its reference count, 2342 * then it's ByteBuffers may be recycled and accessing it outside this method will be thrown an 2343 * exception. the consumer will access entry to remove before release its reference count. 2344 * Notice, don't change its reference count in the {@link Consumer} 2345 */ 2346 public boolean remove(BlockCacheKey key, Consumer<RAMQueueEntry> action) { 2347 RAMQueueEntry previous = delegate.remove(key); 2348 action.accept(previous); 2349 if (previous != null) { 2350 previous.getData().release(); 2351 } 2352 return previous != null; 2353 } 2354 2355 public boolean isEmpty() { 2356 return delegate.isEmpty(); 2357 } 2358 2359 public void clear() { 2360 Iterator<Map.Entry<BlockCacheKey, RAMQueueEntry>> it = delegate.entrySet().iterator(); 2361 while (it.hasNext()) { 2362 RAMQueueEntry re = it.next().getValue(); 2363 it.remove(); 2364 re.getData().release(); 2365 } 2366 } 2367 2368 public boolean hasBlocksForFile(String fileName) { 2369 return delegate.keySet().stream().filter(key -> key.getHfileName().equals(fileName)) 2370 .findFirst().isPresent(); 2371 } 2372 2373 public Set<BlockCacheKey> getRamBlockCacheKeysForHFile(String fileName) { 2374 Set<BlockCacheKey> ramCacheKeySet = new HashSet<>(); 2375 for (BlockCacheKey blockCacheKey : delegate.keySet()) { 2376 if (blockCacheKey.getHfileName().equals(fileName)) { 2377 ramCacheKeySet.add(blockCacheKey); 2378 } 2379 } 2380 return ramCacheKeySet; 2381 } 2382 } 2383 2384 public Map<BlockCacheKey, BucketEntry> getBackingMap() { 2385 return backingMap; 2386 } 2387 2388 public AtomicBoolean getBackingMapValidated() { 2389 return backingMapValidated; 2390 } 2391 2392 @Override 2393 public Optional<Map<String, Pair<String, Long>>> getFullyCachedFiles() { 2394 return Optional.of(fullyCachedFiles); 2395 } 2396 2397 public static Optional<BucketCache> getBucketCacheFromCacheConfig(CacheConfig cacheConf) { 2398 if (cacheConf.getBlockCache().isPresent()) { 2399 BlockCache bc = cacheConf.getBlockCache().get(); 2400 if (bc instanceof CombinedBlockCache) { 2401 BlockCache l2 = ((CombinedBlockCache) bc).getSecondLevelCache(); 2402 if (l2 instanceof BucketCache) { 2403 return Optional.of((BucketCache) l2); 2404 } 2405 } else if (bc instanceof BucketCache) { 2406 return Optional.of((BucketCache) bc); 2407 } 2408 } 2409 return Optional.empty(); 2410 } 2411 2412 @Override 2413 public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int dataBlockCount, 2414 long size) { 2415 // block eviction may be happening in the background as prefetch runs, 2416 // so we need to count all blocks for this file in the backing map under 2417 // a read lock for the block offset 2418 final List<ReentrantReadWriteLock> locks = new ArrayList<>(); 2419 LOG.debug("Notifying caching completed for file {}, with total blocks {}, and data blocks {}", 2420 fileName, totalBlockCount, dataBlockCount); 2421 try { 2422 int count = 0; 2423 LOG.debug("iterating over {} entries in the backing map", backingMap.size()); 2424 Set<BlockCacheKey> result = getAllCacheKeysForFile(fileName.getName(), 0, Long.MAX_VALUE); 2425 if (result.isEmpty() && StoreFileInfo.isReference(fileName)) { 2426 result = getAllCacheKeysForFile( 2427 StoreFileInfo.getReferredToRegionAndFile(fileName.getName()).getSecond(), 0, 2428 Long.MAX_VALUE); 2429 } 2430 for (BlockCacheKey entry : result) { 2431 LOG.debug("found block for file {} in the backing map. Acquiring read lock for offset {}", 2432 fileName.getName(), entry.getOffset()); 2433 ReentrantReadWriteLock lock = offsetLock.getLock(entry.getOffset()); 2434 lock.readLock().lock(); 2435 locks.add(lock); 2436 if (backingMap.containsKey(entry) && entry.getBlockType().isData()) { 2437 count++; 2438 } 2439 } 2440 // BucketCache would only have data blocks 2441 if (dataBlockCount == count) { 2442 LOG.debug("File {} has now been fully cached.", fileName); 2443 fileCacheCompleted(fileName, size); 2444 } else { 2445 LOG.debug( 2446 "Prefetch executor completed for {}, but only {} data blocks were cached. " 2447 + "Total data blocks for file: {}. " 2448 + "Checking for blocks pending cache in cache writer queue.", 2449 fileName, count, dataBlockCount); 2450 if (ramCache.hasBlocksForFile(fileName.getName())) { 2451 for (ReentrantReadWriteLock lock : locks) { 2452 lock.readLock().unlock(); 2453 } 2454 locks.clear(); 2455 LOG.debug("There are still blocks pending caching for file {}. Will sleep 100ms " 2456 + "and try the verification again.", fileName); 2457 Thread.sleep(100); 2458 notifyFileCachingCompleted(fileName, totalBlockCount, dataBlockCount, size); 2459 } else { 2460 LOG.info( 2461 "The total block count was {}. We found only {} data blocks cached from " 2462 + "a total of {} data blocks for file {}, " 2463 + "but no blocks pending caching. Maybe cache is full or evictions " 2464 + "happened concurrently to cache prefetch.", 2465 totalBlockCount, count, dataBlockCount, fileName); 2466 } 2467 } 2468 } catch (InterruptedException e) { 2469 throw new RuntimeException(e); 2470 } finally { 2471 for (ReentrantReadWriteLock lock : locks) { 2472 lock.readLock().unlock(); 2473 } 2474 } 2475 } 2476 2477 @Override 2478 public Optional<Boolean> blockFitsIntoTheCache(HFileBlock block) { 2479 if (!isCacheInitialized("blockFitsIntoTheCache")) { 2480 return Optional.of(false); 2481 } 2482 2483 long currentUsed = bucketAllocator.getUsedSize(); 2484 boolean result = (currentUsed + block.getOnDiskSizeWithHeader()) < acceptableSize(); 2485 return Optional.of(result); 2486 } 2487 2488 @Override 2489 public Optional<Boolean> shouldCacheFile(HFileInfo hFileInfo, Configuration conf) { 2490 String fileName = hFileInfo.getHFileContext().getHFileName(); 2491 DataTieringManager dataTieringManager = DataTieringManager.getInstance(); 2492 if (dataTieringManager != null && !dataTieringManager.isHotData(hFileInfo, conf)) { 2493 LOG.debug("Custom tiering is enabled for file: '{}' and it is not hot data", fileName); 2494 // If custom tiering has been just enabled for a file that was cached, we now need 2495 // to evict it. 2496 Set<BlockCacheKey> keySet = 2497 getAllCacheKeysForFile(hFileInfo.getHFileContext().getHFileName(), 0, Long.MAX_VALUE); 2498 int evictedBlocks = evictBlockSet(keySet); 2499 if (evictedBlocks > 0) { 2500 LOG.debug( 2501 "Evicted {} blocks for file {} as it is now considered cold by DataTieringManager", 2502 evictedBlocks, fileName); 2503 } 2504 return Optional.of(false); 2505 } 2506 // if we don't have the file in fullyCachedFiles, we should cache it 2507 return Optional.of(!fullyCachedFiles.containsKey(fileName)); 2508 } 2509 2510 @Override 2511 public Optional<Boolean> shouldCacheBlock(BlockCacheKey key, long maxTimestamp, 2512 Configuration conf) { 2513 DataTieringManager dataTieringManager = DataTieringManager.getInstance(); 2514 if (dataTieringManager != null && !dataTieringManager.isHotData(maxTimestamp, conf)) { 2515 LOG.debug("Data tiering is enabled for file: '{}' and it is not hot data", 2516 key.getHfileName()); 2517 return Optional.of(false); 2518 } 2519 return Optional.of(true); 2520 } 2521 2522 @Override 2523 public Optional<Boolean> isAlreadyCached(BlockCacheKey key) { 2524 boolean foundKey = backingMap.containsKey(key); 2525 // if there's no entry for the key itself, we need to check if this key is for a reference, 2526 // and if so, look for a block from the referenced file using this getBlockForReference method. 2527 return Optional.of(foundKey ? true : getBlockForReference(key) != null); 2528 } 2529 2530 @Override 2531 public Optional<Integer> getBlockSize(BlockCacheKey key) { 2532 BucketEntry entry = backingMap.get(key); 2533 if (entry == null) { 2534 // the key might be for a reference tha we had found the block from the referenced file in 2535 // the cache when we first tried to cache it. 2536 entry = getBlockForReference(key); 2537 return entry == null ? Optional.empty() : Optional.of(entry.getOnDiskSizeWithHeader()); 2538 } else { 2539 return Optional.of(entry.getOnDiskSizeWithHeader()); 2540 } 2541 2542 } 2543 2544 boolean isCacheInitialized(String api) { 2545 if (cacheState == CacheState.INITIALIZING) { 2546 LOG.warn("Bucket initialisation pending at {}", api); 2547 return false; 2548 } 2549 return true; 2550 } 2551 2552 @Override 2553 public boolean waitForCacheInitialization(long timeout) { 2554 while (cacheState == CacheState.INITIALIZING) { 2555 if (timeout <= 0) { 2556 break; 2557 } 2558 try { 2559 Thread.sleep(100); 2560 } catch (InterruptedException e) { 2561 LOG.warn("Interrupted while waiting for cache initialization", e); 2562 Thread.currentThread().interrupt(); 2563 break; 2564 } 2565 timeout -= 100; 2566 } 2567 return isCacheEnabled(); 2568 } 2569}