001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BLOCKCACHE_STATS_PERIODS; 021import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BLOCKCACHE_STATS_PERIOD_MINUTES_KEY; 022import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.DEFAULT_BLOCKCACHE_STATS_PERIODS; 023import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.DEFAULT_BLOCKCACHE_STATS_PERIOD_MINUTES; 024import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; 025 026import java.io.File; 027import java.io.FileInputStream; 028import java.io.FileOutputStream; 029import java.io.IOException; 030import java.nio.ByteBuffer; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.Collections; 034import java.util.Comparator; 035import java.util.HashSet; 036import java.util.Iterator; 037import java.util.List; 038import java.util.Map; 039import java.util.NavigableSet; 040import java.util.Optional; 041import java.util.PriorityQueue; 042import java.util.Set; 043import java.util.concurrent.ArrayBlockingQueue; 044import java.util.concurrent.BlockingQueue; 045import java.util.concurrent.ConcurrentHashMap; 046import java.util.concurrent.ConcurrentMap; 047import java.util.concurrent.ConcurrentSkipListSet; 048import java.util.concurrent.Executors; 049import java.util.concurrent.ScheduledExecutorService; 050import java.util.concurrent.TimeUnit; 051import java.util.concurrent.atomic.AtomicBoolean; 052import java.util.concurrent.atomic.AtomicLong; 053import java.util.concurrent.atomic.LongAdder; 054import java.util.concurrent.locks.Lock; 055import java.util.concurrent.locks.ReentrantLock; 056import java.util.concurrent.locks.ReentrantReadWriteLock; 057import java.util.function.Consumer; 058import java.util.function.Function; 059import org.apache.commons.io.IOUtils; 060import org.apache.commons.lang3.mutable.MutableInt; 061import org.apache.hadoop.conf.Configuration; 062import org.apache.hadoop.fs.Path; 063import org.apache.hadoop.hbase.HBaseConfiguration; 064import org.apache.hadoop.hbase.HBaseIOException; 065import org.apache.hadoop.hbase.TableName; 066import org.apache.hadoop.hbase.client.Admin; 067import org.apache.hadoop.hbase.io.ByteBuffAllocator; 068import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; 069import org.apache.hadoop.hbase.io.HeapSize; 070import org.apache.hadoop.hbase.io.hfile.BlockCache; 071import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 072import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil; 073import org.apache.hadoop.hbase.io.hfile.BlockPriority; 074import org.apache.hadoop.hbase.io.hfile.BlockType; 075import org.apache.hadoop.hbase.io.hfile.CacheConfig; 076import org.apache.hadoop.hbase.io.hfile.CacheStats; 077import org.apache.hadoop.hbase.io.hfile.Cacheable; 078import org.apache.hadoop.hbase.io.hfile.CachedBlock; 079import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; 080import org.apache.hadoop.hbase.io.hfile.HFileBlock; 081import org.apache.hadoop.hbase.io.hfile.HFileContext; 082import org.apache.hadoop.hbase.io.hfile.HFileInfo; 083import org.apache.hadoop.hbase.nio.ByteBuff; 084import org.apache.hadoop.hbase.nio.RefCnt; 085import org.apache.hadoop.hbase.protobuf.ProtobufMagic; 086import org.apache.hadoop.hbase.regionserver.DataTieringManager; 087import org.apache.hadoop.hbase.regionserver.HRegion; 088import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 089import org.apache.hadoop.hbase.util.Bytes; 090import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 091import org.apache.hadoop.hbase.util.IdReadWriteLock; 092import org.apache.hadoop.hbase.util.IdReadWriteLockStrongRef; 093import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool; 094import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool.ReferenceType; 095import org.apache.hadoop.hbase.util.Pair; 096import org.apache.hadoop.util.StringUtils; 097import org.apache.yetus.audience.InterfaceAudience; 098import org.slf4j.Logger; 099import org.slf4j.LoggerFactory; 100 101import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 102import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 103 104import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos; 105 106/** 107 * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses BucketCache#ramCache 108 * and BucketCache#backingMap in order to determine if a given element is in the cache. The bucket 109 * cache can use off-heap memory {@link ByteBufferIOEngine} or mmap 110 * {@link ExclusiveMemoryMmapIOEngine} or pmem {@link SharedMemoryMmapIOEngine} or local files 111 * {@link FileIOEngine} to store/read the block data. 112 * <p> 113 * Eviction is via a similar algorithm as used in 114 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache} 115 * <p> 116 * BucketCache can be used as mainly a block cache (see 117 * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with a BlockCache to 118 * decrease CMS GC and heap fragmentation. 119 * <p> 120 * It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store blocks) to 121 * enlarge cache space via a victim cache. 122 */ 123@InterfaceAudience.Private 124public class BucketCache implements BlockCache, HeapSize { 125 private static final Logger LOG = LoggerFactory.getLogger(BucketCache.class); 126 127 /** Priority buckets config */ 128 static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor"; 129 static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor"; 130 static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor"; 131 static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor"; 132 static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor"; 133 static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor"; 134 static final String BACKING_MAP_PERSISTENCE_CHUNK_SIZE = 135 "hbase.bucketcache.persistence.chunksize"; 136 137 /** Use strong reference for offsetLock or not */ 138 private static final String STRONG_REF_KEY = "hbase.bucketcache.offsetlock.usestrongref"; 139 private static final boolean STRONG_REF_DEFAULT = false; 140 141 /** The cache age of blocks to check if the related file is present on any online regions. */ 142 static final String BLOCK_ORPHAN_GRACE_PERIOD = 143 "hbase.bucketcache.block.orphan.evictgraceperiod.seconds"; 144 145 static final long BLOCK_ORPHAN_GRACE_PERIOD_DEFAULT = 24 * 60 * 60 * 1000L; 146 147 /** Priority buckets */ 148 static final float DEFAULT_SINGLE_FACTOR = 0.25f; 149 static final float DEFAULT_MULTI_FACTOR = 0.50f; 150 static final float DEFAULT_MEMORY_FACTOR = 0.25f; 151 static final float DEFAULT_MIN_FACTOR = 0.85f; 152 153 static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f; 154 static final float DEFAULT_ACCEPT_FACTOR = 0.95f; 155 156 // Number of blocks to clear for each of the bucket size that is full 157 static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2; 158 159 /** Statistics thread */ 160 private static final int statThreadPeriod = 5 * 60; 161 162 final static int DEFAULT_WRITER_THREADS = 3; 163 final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; 164 165 final static long DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE = 10000; 166 167 // Store/read block data 168 transient final IOEngine ioEngine; 169 170 // Store the block in this map before writing it to cache 171 transient final RAMCache ramCache; 172 173 // In this map, store the block's meta data like offset, length 174 transient Map<BlockCacheKey, BucketEntry> backingMap; 175 176 private AtomicBoolean backingMapValidated = new AtomicBoolean(false); 177 178 /** 179 * Map of hFile -> Region -> File size. This map is used to track all files completed prefetch, 180 * together with the region those belong to and the total cached size for the 181 * region.TestBlockEvictionOnRegionMovement 182 */ 183 transient final Map<String, Pair<String, Long>> fullyCachedFiles = new ConcurrentHashMap<>(); 184 /** 185 * Map of region -> total size of the region prefetched on this region server. This is the total 186 * size of hFiles for this region prefetched on this region server 187 */ 188 final Map<String, Long> regionCachedSize = new ConcurrentHashMap<>(); 189 190 private transient BucketCachePersister cachePersister; 191 192 /** 193 * Enum to represent the state of cache 194 */ 195 protected enum CacheState { 196 // Initializing: State when the cache is being initialised from persistence. 197 INITIALIZING, 198 // Enabled: State when cache is initialised and is ready. 199 ENABLED, 200 // Disabled: State when the cache is disabled. 201 DISABLED 202 } 203 204 /** 205 * Flag if the cache is enabled or not... We shut it off if there are IO errors for some time, so 206 * that Bucket IO exceptions/errors don't bring down the HBase server. 207 */ 208 private volatile CacheState cacheState; 209 210 /** 211 * A list of writer queues. We have a queue per {@link WriterThread} we have running. In other 212 * words, the work adding blocks to the BucketCache is divided up amongst the running 213 * WriterThreads. Its done by taking hash of the cache key modulo queue count. WriterThread when 214 * it runs takes whatever has been recently added and 'drains' the entries to the BucketCache. It 215 * then updates the ramCache and backingMap accordingly. 216 */ 217 transient final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList<>(); 218 transient final WriterThread[] writerThreads; 219 220 /** Volatile boolean to track if free space is in process or not */ 221 private volatile boolean freeInProgress = false; 222 private transient final Lock freeSpaceLock = new ReentrantLock(); 223 224 private final LongAdder realCacheSize = new LongAdder(); 225 private final LongAdder heapSize = new LongAdder(); 226 /** Current number of cached elements */ 227 private final LongAdder blockNumber = new LongAdder(); 228 229 /** Cache access count (sequential ID) */ 230 private final AtomicLong accessCount = new AtomicLong(); 231 232 private static final int DEFAULT_CACHE_WAIT_TIME = 50; 233 234 private final BucketCacheStats cacheStats; 235 private final String persistencePath; 236 static AtomicBoolean isCacheInconsistent = new AtomicBoolean(false); 237 private final long cacheCapacity; 238 /** Approximate block size */ 239 private final long blockSize; 240 241 /** Duration of IO errors tolerated before we disable cache, 1 min as default */ 242 private final int ioErrorsTolerationDuration; 243 // 1 min 244 public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000; 245 246 // Start time of first IO error when reading or writing IO Engine, it will be 247 // reset after a successful read/write. 248 private volatile long ioErrorStartTime = -1; 249 250 private transient Configuration conf; 251 252 /** 253 * A ReentrantReadWriteLock to lock on a particular block identified by offset. The purpose of 254 * this is to avoid freeing the block which is being read. 255 * <p> 256 */ 257 transient final IdReadWriteLock<Long> offsetLock; 258 259 transient NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<>( 260 Comparator.comparing(BlockCacheKey::getHfileName).thenComparingLong(BlockCacheKey::getOffset)); 261 262 /** Statistics thread schedule pool (for heavy debugging, could remove) */ 263 private transient final ScheduledExecutorService scheduleThreadPool = 264 Executors.newScheduledThreadPool(1, 265 new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build()); 266 267 // Allocate or free space for the block 268 private transient BucketAllocator bucketAllocator; 269 270 /** Acceptable size of cache (no evictions if size < acceptable) */ 271 private float acceptableFactor; 272 273 /** Minimum threshold of cache (when evicting, evict until size < min) */ 274 private float minFactor; 275 276 /** 277 * Free this floating point factor of extra blocks when evicting. For example free the number of 278 * blocks requested * (1 + extraFreeFactor) 279 */ 280 private float extraFreeFactor; 281 282 /** Single access bucket size */ 283 private float singleFactor; 284 285 /** Multiple access bucket size */ 286 private float multiFactor; 287 288 /** In-memory bucket size */ 289 private float memoryFactor; 290 291 private long bucketcachePersistInterval; 292 293 private static final String FILE_VERIFY_ALGORITHM = 294 "hbase.bucketcache.persistent.file.integrity.check.algorithm"; 295 private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5"; 296 297 static final String QUEUE_ADDITION_WAIT_TIME = "hbase.bucketcache.queue.addition.waittime"; 298 private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0; 299 private long queueAdditionWaitTime; 300 /** 301 * Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file 302 * integrity, default algorithm is MD5 303 */ 304 private String algorithm; 305 306 private long persistenceChunkSize; 307 308 /* Tracing failed Bucket Cache allocations. */ 309 private long allocFailLogPrevTs; // time of previous log event for allocation failure. 310 private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000; // Default 1 minute. 311 312 private transient Map<String, HRegion> onlineRegions; 313 314 private long orphanBlockGracePeriod = 0; 315 316 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 317 int writerThreadNum, int writerQLen, String persistencePath) throws IOException { 318 this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 319 persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create()); 320 } 321 322 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 323 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, 324 Configuration conf) throws IOException { 325 this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 326 persistencePath, ioErrorsTolerationDuration, conf, null); 327 } 328 329 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 330 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, 331 Configuration conf, Map<String, HRegion> onlineRegions) throws IOException { 332 Preconditions.checkArgument(blockSize > 0, 333 "BucketCache capacity is set to " + blockSize + ", can not be less than 0"); 334 boolean useStrongRef = conf.getBoolean(STRONG_REF_KEY, STRONG_REF_DEFAULT); 335 if (useStrongRef) { 336 this.offsetLock = new IdReadWriteLockStrongRef<>(); 337 } else { 338 this.offsetLock = new IdReadWriteLockWithObjectPool<>(ReferenceType.SOFT); 339 } 340 this.conf = conf; 341 this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM); 342 this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath); 343 this.writerThreads = new WriterThread[writerThreadNum]; 344 this.onlineRegions = onlineRegions; 345 this.orphanBlockGracePeriod = 346 conf.getLong(BLOCK_ORPHAN_GRACE_PERIOD, BLOCK_ORPHAN_GRACE_PERIOD_DEFAULT); 347 long blockNumCapacity = capacity / blockSize; 348 if (blockNumCapacity >= Integer.MAX_VALUE) { 349 // Enough for about 32TB of cache! 350 throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now"); 351 } 352 353 // these sets the dynamic configs 354 this.onConfigurationChange(conf); 355 this.cacheStats = 356 new BucketCacheStats(conf.getInt(BLOCKCACHE_STATS_PERIODS, DEFAULT_BLOCKCACHE_STATS_PERIODS), 357 conf.getInt(BLOCKCACHE_STATS_PERIOD_MINUTES_KEY, DEFAULT_BLOCKCACHE_STATS_PERIOD_MINUTES)); 358 359 LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor 360 + ", minFactor: " + minFactor + ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: " 361 + singleFactor + ", multiFactor: " + multiFactor + ", memoryFactor: " + memoryFactor 362 + ", useStrongRef: " + useStrongRef); 363 364 this.cacheCapacity = capacity; 365 this.persistencePath = persistencePath; 366 this.blockSize = blockSize; 367 this.ioErrorsTolerationDuration = ioErrorsTolerationDuration; 368 this.cacheState = CacheState.INITIALIZING; 369 370 this.allocFailLogPrevTs = 0; 371 372 for (int i = 0; i < writerThreads.length; ++i) { 373 writerQueues.add(new ArrayBlockingQueue<>(writerQLen)); 374 } 375 376 assert writerQueues.size() == writerThreads.length; 377 this.ramCache = new RAMCache(); 378 379 this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity); 380 instantiateWriterThreads(); 381 382 if (isCachePersistent()) { 383 if (ioEngine instanceof FileIOEngine) { 384 startBucketCachePersisterThread(); 385 } 386 startPersistenceRetriever(bucketSizes, capacity); 387 } else { 388 bucketAllocator = new BucketAllocator(capacity, bucketSizes); 389 this.cacheState = CacheState.ENABLED; 390 startWriterThreads(); 391 } 392 393 // Run the statistics thread periodically to print the cache statistics log 394 // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log 395 // every five minutes. 396 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod, 397 statThreadPeriod, TimeUnit.SECONDS); 398 LOG.info("Started bucket cache; ioengine=" + ioEngineName + ", capacity=" 399 + StringUtils.byteDesc(capacity) + ", blockSize=" + StringUtils.byteDesc(blockSize) 400 + ", writerThreadNum=" + writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" 401 + persistencePath + ", bucketAllocator=" + BucketAllocator.class.getName()); 402 } 403 404 private void startPersistenceRetriever(int[] bucketSizes, long capacity) { 405 Runnable persistentCacheRetriever = () -> { 406 try { 407 retrieveFromFile(bucketSizes); 408 LOG.info("Persistent bucket cache recovery from {} is complete.", persistencePath); 409 } catch (Throwable ex) { 410 LOG.warn("Can't restore from file[{}]. The bucket cache will be reset and rebuilt." 411 + " Exception seen: ", persistencePath, ex); 412 backingMap.clear(); 413 fullyCachedFiles.clear(); 414 backingMapValidated.set(true); 415 regionCachedSize.clear(); 416 try { 417 bucketAllocator = new BucketAllocator(capacity, bucketSizes); 418 } catch (BucketAllocatorException allocatorException) { 419 LOG.error("Exception during Bucket Allocation", allocatorException); 420 } 421 } finally { 422 this.cacheState = CacheState.ENABLED; 423 startWriterThreads(); 424 } 425 }; 426 Thread t = new Thread(persistentCacheRetriever); 427 t.start(); 428 } 429 430 private void sanityCheckConfigs() { 431 Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0, 432 ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 433 Preconditions.checkArgument(minFactor <= 1 && minFactor >= 0, 434 MIN_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 435 Preconditions.checkArgument(minFactor <= acceptableFactor, 436 MIN_FACTOR_CONFIG_NAME + " must be <= " + ACCEPT_FACTOR_CONFIG_NAME); 437 Preconditions.checkArgument(extraFreeFactor >= 0, 438 EXTRA_FREE_FACTOR_CONFIG_NAME + " must be greater than 0.0"); 439 Preconditions.checkArgument(singleFactor <= 1 && singleFactor >= 0, 440 SINGLE_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 441 Preconditions.checkArgument(multiFactor <= 1 && multiFactor >= 0, 442 MULTI_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 443 Preconditions.checkArgument(memoryFactor <= 1 && memoryFactor >= 0, 444 MEMORY_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 445 Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1, 446 SINGLE_FACTOR_CONFIG_NAME + ", " + MULTI_FACTOR_CONFIG_NAME + ", and " 447 + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0"); 448 if (this.persistenceChunkSize <= 0) { 449 persistenceChunkSize = DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE; 450 } 451 } 452 453 /** 454 * Called by the constructor to instantiate the writer threads. 455 */ 456 private void instantiateWriterThreads() { 457 final String threadName = Thread.currentThread().getName(); 458 for (int i = 0; i < this.writerThreads.length; ++i) { 459 this.writerThreads[i] = new WriterThread(this.writerQueues.get(i)); 460 this.writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); 461 this.writerThreads[i].setDaemon(true); 462 } 463 } 464 465 /** 466 * Called by the constructor to start the writer threads. Used by tests that need to override 467 * starting the threads. 468 */ 469 protected void startWriterThreads() { 470 for (WriterThread thread : writerThreads) { 471 thread.start(); 472 } 473 } 474 475 void startBucketCachePersisterThread() { 476 LOG.info("Starting BucketCachePersisterThread"); 477 cachePersister = new BucketCachePersister(this, bucketcachePersistInterval); 478 cachePersister.setDaemon(true); 479 cachePersister.start(); 480 } 481 482 @Override 483 public boolean isCacheEnabled() { 484 return this.cacheState == CacheState.ENABLED; 485 } 486 487 @Override 488 public long getMaxSize() { 489 return this.cacheCapacity; 490 } 491 492 public String getIoEngine() { 493 return ioEngine.toString(); 494 } 495 496 /** 497 * Get the IOEngine from the IO engine name 498 * @return the IOEngine 499 */ 500 private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath) 501 throws IOException { 502 if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) { 503 // In order to make the usage simple, we only need the prefix 'files:' in 504 // document whether one or multiple file(s), but also support 'file:' for 505 // the compatibility 506 String[] filePaths = 507 ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER); 508 return new FileIOEngine(capacity, persistencePath != null, filePaths); 509 } else if (ioEngineName.startsWith("offheap")) { 510 return new ByteBufferIOEngine(capacity); 511 } else if (ioEngineName.startsWith("mmap:")) { 512 return new ExclusiveMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 513 } else if (ioEngineName.startsWith("pmem:")) { 514 // This mode of bucket cache creates an IOEngine over a file on the persistent memory 515 // device. Since the persistent memory device has its own address space the contents 516 // mapped to this address space does not get swapped out like in the case of mmapping 517 // on to DRAM. Hence the cells created out of the hfile blocks in the pmem bucket cache 518 // can be directly referred to without having to copy them onheap. Once the RPC is done, 519 // the blocks can be returned back as in case of ByteBufferIOEngine. 520 return new SharedMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 521 } else { 522 throw new IllegalArgumentException( 523 "Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap"); 524 } 525 } 526 527 public boolean isCachePersistenceEnabled() { 528 return persistencePath != null; 529 } 530 531 /** 532 * Cache the block with the specified name and buffer. 533 * @param cacheKey block's cache key 534 * @param buf block buffer 535 */ 536 @Override 537 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 538 cacheBlock(cacheKey, buf, false); 539 } 540 541 /** 542 * Cache the block with the specified name and buffer. 543 * @param cacheKey block's cache key 544 * @param cachedItem block buffer 545 * @param inMemory if block is in-memory 546 */ 547 @Override 548 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) { 549 cacheBlockWithWait(cacheKey, cachedItem, inMemory, false); 550 } 551 552 /** 553 * Cache the block with the specified name and buffer. 554 * @param cacheKey block's cache key 555 * @param cachedItem block buffer 556 * @param inMemory if block is in-memory 557 */ 558 @Override 559 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, 560 boolean waitWhenCache) { 561 cacheBlockWithWait(cacheKey, cachedItem, inMemory, waitWhenCache && queueAdditionWaitTime > 0); 562 } 563 564 /** 565 * Cache the block to ramCache 566 * @param cacheKey block's cache key 567 * @param cachedItem block buffer 568 * @param inMemory if block is in-memory 569 * @param wait if true, blocking wait when queue is full 570 */ 571 public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, 572 boolean wait) { 573 if (isCacheEnabled()) { 574 if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) { 575 if (shouldReplaceExistingCacheBlock(cacheKey, cachedItem)) { 576 BucketEntry bucketEntry = backingMap.get(cacheKey); 577 if (bucketEntry != null && bucketEntry.isRpcRef()) { 578 // avoid replace when there are RPC refs for the bucket entry in bucket cache 579 return; 580 } 581 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 582 } 583 } else { 584 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 585 } 586 } 587 } 588 589 protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) { 590 return BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, newBlock); 591 } 592 593 protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, 594 boolean inMemory, boolean wait) { 595 if (!isCacheEnabled()) { 596 return; 597 } 598 if (cacheKey.getBlockType() == null && cachedItem.getBlockType() != null) { 599 cacheKey.setBlockType(cachedItem.getBlockType()); 600 } 601 LOG.debug("Caching key={}, item={}, key heap size={}", cacheKey, cachedItem, 602 cacheKey.heapSize()); 603 // Stuff the entry into the RAM cache so it can get drained to the persistent store 604 RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), 605 inMemory, isCachePersistent() && ioEngine instanceof FileIOEngine, wait); 606 /** 607 * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same 608 * key in ramCache, the heap size of bucket cache need to update if replacing entry from 609 * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if 610 * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct 611 * one, then the heap size will mess up (HBASE-20789) 612 */ 613 if (ramCache.putIfAbsent(cacheKey, re) != null) { 614 return; 615 } 616 int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); 617 BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum); 618 boolean successfulAddition = false; 619 if (wait) { 620 try { 621 successfulAddition = bq.offer(re, queueAdditionWaitTime, TimeUnit.MILLISECONDS); 622 } catch (InterruptedException e) { 623 LOG.error("Thread interrupted: ", e); 624 Thread.currentThread().interrupt(); 625 } 626 } else { 627 successfulAddition = bq.offer(re); 628 } 629 if (!successfulAddition) { 630 LOG.debug("Failed to insert block {} into the cache writers queue", cacheKey); 631 ramCache.remove(cacheKey); 632 cacheStats.failInsert(); 633 } else { 634 this.blockNumber.increment(); 635 this.heapSize.add(cachedItem.heapSize()); 636 } 637 } 638 639 /** 640 * If the passed cache key relates to a reference (<hfile>.<parentEncRegion>), this 641 * method looks for the block from the referred file, in the cache. If present in the cache, the 642 * block for the referred file is returned, otherwise, this method returns null. It will also 643 * return null if the passed cache key doesn't relate to a reference. 644 * @param key the BlockCacheKey instance to look for in the cache. 645 * @return the cached block from the referred file, null if there's no such block in the cache or 646 * the passed key doesn't relate to a reference. 647 */ 648 public BucketEntry getBlockForReference(BlockCacheKey key) { 649 BucketEntry foundEntry = null; 650 String referredFileName = null; 651 if (StoreFileInfo.isReference(key.getHfileName())) { 652 referredFileName = StoreFileInfo.getReferredToRegionAndFile(key.getHfileName()).getSecond(); 653 } 654 if (referredFileName != null) { 655 // Since we just need this key for a lookup, it's enough to use only name and offset 656 BlockCacheKey convertedCacheKey = new BlockCacheKey(referredFileName, key.getOffset()); 657 foundEntry = backingMap.get(convertedCacheKey); 658 LOG.debug("Got a link/ref: {}. Related cacheKey: {}. Found entry: {}", key.getHfileName(), 659 convertedCacheKey, foundEntry); 660 } 661 return foundEntry; 662 } 663 664 /** 665 * Get the buffer of the block with the specified key. 666 * @param key block's cache key 667 * @param caching true if the caller caches blocks on cache misses 668 * @param repeat Whether this is a repeat lookup for the same block 669 * @param updateCacheMetrics Whether we should update cache metrics or not 670 * @return buffer of specified cache key, or null if not in cache 671 */ 672 @Override 673 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, 674 boolean updateCacheMetrics) { 675 if (!isCacheEnabled()) { 676 cacheStats.miss(caching, key.isPrimary(), key.getBlockType()); 677 return null; 678 } 679 RAMQueueEntry re = ramCache.get(key); 680 if (re != null) { 681 if (updateCacheMetrics) { 682 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 683 } 684 re.access(accessCount.incrementAndGet()); 685 return re.getData(); 686 } 687 BucketEntry bucketEntry = backingMap.get(key); 688 LOG.debug("bucket entry for key {}: {}", key, 689 bucketEntry == null ? null : bucketEntry.offset()); 690 if (bucketEntry == null) { 691 bucketEntry = getBlockForReference(key); 692 } 693 if (bucketEntry != null) { 694 long start = System.nanoTime(); 695 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); 696 try { 697 lock.readLock().lock(); 698 // We can not read here even if backingMap does contain the given key because its offset 699 // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check 700 // existence here. 701 if ( 702 bucketEntry.equals(backingMap.get(key)) || bucketEntry.equals(getBlockForReference(key)) 703 ) { 704 // Read the block from IOEngine based on the bucketEntry's offset and length, NOTICE: the 705 // block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to 706 // the same BucketEntry, then all of the three will share the same refCnt. 707 Cacheable cachedBlock = ioEngine.read(bucketEntry); 708 if (ioEngine.usesSharedMemory()) { 709 // If IOEngine use shared memory, cachedBlock and BucketEntry will share the 710 // same RefCnt, do retain here, in order to count the number of RPC references 711 cachedBlock.retain(); 712 } 713 // Update the cache statistics. 714 if (updateCacheMetrics) { 715 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 716 cacheStats.ioHit(System.nanoTime() - start); 717 } 718 bucketEntry.access(accessCount.incrementAndGet()); 719 if (this.ioErrorStartTime > 0) { 720 ioErrorStartTime = -1; 721 } 722 return cachedBlock; 723 } 724 } catch (HBaseIOException hioex) { 725 // When using file io engine persistent cache, 726 // the cache map state might differ from the actual cache. If we reach this block, 727 // we should remove the cache key entry from the backing map 728 backingMap.remove(key); 729 fileNotFullyCached(key, bucketEntry); 730 LOG.debug("Failed to fetch block for cache key: {}.", key, hioex); 731 } catch (IOException ioex) { 732 LOG.error("Failed reading block " + key + " from bucket cache", ioex); 733 checkIOErrorIsTolerated(); 734 } finally { 735 lock.readLock().unlock(); 736 } 737 } 738 if (!repeat && updateCacheMetrics) { 739 cacheStats.miss(caching, key.isPrimary(), key.getBlockType()); 740 } 741 return null; 742 } 743 744 /** 745 * This method is invoked after the bucketEntry is removed from {@link BucketCache#backingMap} 746 */ 747 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber, 748 boolean evictedByEvictionProcess) { 749 bucketEntry.markAsEvicted(); 750 blocksByHFile.remove(cacheKey); 751 if (decrementBlockNumber) { 752 this.blockNumber.decrement(); 753 if (ioEngine.isPersistent()) { 754 fileNotFullyCached(cacheKey, bucketEntry); 755 } 756 } 757 if (evictedByEvictionProcess) { 758 cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); 759 } 760 if (ioEngine.isPersistent()) { 761 setCacheInconsistent(true); 762 } 763 } 764 765 private void fileNotFullyCached(BlockCacheKey key, BucketEntry entry) { 766 // Update the updateRegionCachedSize before removing the file from fullyCachedFiles. 767 // This computation should happen even if the file is not in fullyCachedFiles map. 768 updateRegionCachedSize(key, (entry.getLength() * -1)); 769 fullyCachedFiles.remove(key.getHfileName()); 770 } 771 772 public void fileCacheCompleted(Path filePath, long size) { 773 Pair<String, Long> pair = new Pair<>(); 774 // sets the region name 775 String regionName = filePath.getParent().getParent().getName(); 776 pair.setFirst(regionName); 777 pair.setSecond(size); 778 fullyCachedFiles.put(filePath.getName(), pair); 779 } 780 781 private void updateRegionCachedSize(BlockCacheKey key, long cachedSize) { 782 if (key.getRegionName() != null) { 783 if (key.isArchived()) { 784 LOG.trace("Skipping region cached size update for archived file:{} from region: {}", 785 key.getHfileName(), key.getRegionName()); 786 } else { 787 String regionName = key.getRegionName(); 788 regionCachedSize.merge(regionName, cachedSize, 789 (previousSize, newBlockSize) -> previousSize + newBlockSize); 790 LOG.trace("Updating region cached size for region: {}", regionName); 791 // If all the blocks for a region are evicted from the cache, 792 // remove the entry for that region from regionCachedSize map. 793 if (regionCachedSize.get(regionName) <= 0) { 794 regionCachedSize.remove(regionName); 795 } 796 } 797 } 798 } 799 800 /** 801 * Free the {{@link BucketEntry} actually,which could only be invoked when the 802 * {@link BucketEntry#refCnt} becoming 0. 803 */ 804 void freeBucketEntry(BucketEntry bucketEntry) { 805 bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength()); 806 realCacheSize.add(-1 * bucketEntry.getLength()); 807 } 808 809 /** 810 * Try to evict the block from {@link BlockCache} by force. We'll call this in few cases:<br> 811 * 1. Close an HFile, and clear all cached blocks. <br> 812 * 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for a given table.<br> 813 * <p> 814 * Firstly, we'll try to remove the block from RAMCache,and then try to evict from backingMap. 815 * Here we evict the block from backingMap immediately, but only free the reference from bucket 816 * cache by calling {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this 817 * block, block can only be de-allocated when all of them release the block. 818 * <p> 819 * NOTICE: we need to grab the write offset lock firstly before releasing the reference from 820 * bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt = 0 when 821 * {@link BucketCache#getBlock(BlockCacheKey, boolean, boolean, boolean)}, it's a memory leak. 822 * @param cacheKey Block to evict 823 * @return true to indicate whether we've evicted successfully or not. 824 */ 825 @Override 826 public boolean evictBlock(BlockCacheKey cacheKey) { 827 return doEvictBlock(cacheKey, null, false); 828 } 829 830 /** 831 * Evict the {@link BlockCacheKey} and {@link BucketEntry} from {@link BucketCache#backingMap} and 832 * {@link BucketCache#ramCache}. <br/> 833 * NOTE:When Evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and 834 * {@link BucketEntry} could be removed. 835 * @param cacheKey {@link BlockCacheKey} to evict. 836 * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict. 837 * @return true to indicate whether we've evicted successfully or not. 838 */ 839 private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry, 840 boolean evictedByEvictionProcess) { 841 if (!isCacheEnabled()) { 842 return false; 843 } 844 boolean existedInRamCache = removeFromRamCache(cacheKey); 845 if (bucketEntry == null) { 846 bucketEntry = backingMap.get(cacheKey); 847 } 848 final BucketEntry bucketEntryToUse = bucketEntry; 849 850 if (bucketEntryToUse == null) { 851 if (existedInRamCache && evictedByEvictionProcess) { 852 cacheStats.evicted(0, cacheKey.isPrimary()); 853 } 854 return existedInRamCache; 855 } else { 856 return bucketEntryToUse.withWriteLock(offsetLock, () -> { 857 if (backingMap.remove(cacheKey, bucketEntryToUse)) { 858 LOG.debug("removed key {} from back map with offset lock {} in the evict process", 859 cacheKey, bucketEntryToUse.offset()); 860 blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache, evictedByEvictionProcess); 861 return true; 862 } 863 return false; 864 }); 865 } 866 } 867 868 /** 869 * <pre> 870 * Create the {@link Recycler} for {@link BucketEntry#refCnt},which would be used as 871 * {@link RefCnt#recycler} of {@link HFileBlock#buf} returned from {@link BucketCache#getBlock}. 872 * NOTE: for {@link BucketCache#getBlock},the {@link RefCnt#recycler} of {@link HFileBlock#buf} 873 * from {@link BucketCache#backingMap} and {@link BucketCache#ramCache} are different: 874 * 1.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#backingMap}, 875 * it is the return value of current {@link BucketCache#createRecycler} method. 876 * 877 * 2.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#ramCache}, 878 * it is {@link ByteBuffAllocator#putbackBuffer}. 879 * </pre> 880 */ 881 public Recycler createRecycler(final BucketEntry bucketEntry) { 882 return () -> { 883 freeBucketEntry(bucketEntry); 884 return; 885 }; 886 } 887 888 /** 889 * NOTE: This method is only for test. 890 */ 891 public boolean evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey) { 892 BucketEntry bucketEntry = backingMap.get(blockCacheKey); 893 if (bucketEntry == null) { 894 return false; 895 } 896 return evictBucketEntryIfNoRpcReferenced(blockCacheKey, bucketEntry); 897 } 898 899 /** 900 * Evict {@link BlockCacheKey} and its corresponding {@link BucketEntry} only if 901 * {@link BucketEntry#isRpcRef} is false. <br/> 902 * NOTE:When evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and 903 * {@link BucketEntry} could be removed. 904 * @param blockCacheKey {@link BlockCacheKey} to evict. 905 * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict. 906 * @return true to indicate whether we've evicted successfully or not. 907 */ 908 boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, BucketEntry bucketEntry) { 909 if (!bucketEntry.isRpcRef()) { 910 return doEvictBlock(blockCacheKey, bucketEntry, true); 911 } 912 return false; 913 } 914 915 /** 916 * Since HBASE-29249, the following properties governin freeSpace behaviour and block priorities 917 * were made dynamically configurable: - hbase.bucketcache.acceptfactor - 918 * hbase.bucketcache.minfactor - hbase.bucketcache.extrafreefactor - 919 * hbase.bucketcache.single.factor - hbase.bucketcache.multi.factor - 920 * hbase.bucketcache.multi.factor - hbase.bucketcache.memory.factor The 921 * hbase.bucketcache.queue.addition.waittime property allows for introducing a delay in the 922 * publishing of blocks for the cache writer threads during prefetch reads only (client reads 923 * wouldn't get delayed). It has also been made dynamic configurable since HBASE-29249. The 924 * hbase.bucketcache.persist.intervalinmillis propperty determines the frequency for saving the 925 * persistent cache, and it has also been made dynamically configurable since HBASE-29249. The 926 * hbase.bucketcache.persistence.chunksize property determines the size of the persistent file 927 * splits (due to the limitation of maximum allowed protobuff size), and it has also been made 928 * dynamically configurable since HBASE-29249. 929 * @param config the new configuration to be updated. 930 */ 931 @Override 932 public void onConfigurationChange(Configuration config) { 933 this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR); 934 this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR); 935 this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR); 936 this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR); 937 this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR); 938 this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR); 939 this.queueAdditionWaitTime = 940 conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME); 941 this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000); 942 this.persistenceChunkSize = 943 conf.getLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE); 944 sanityCheckConfigs(); 945 } 946 947 protected boolean removeFromRamCache(BlockCacheKey cacheKey) { 948 return ramCache.remove(cacheKey, re -> { 949 if (re != null) { 950 this.blockNumber.decrement(); 951 this.heapSize.add(-1 * re.getData().heapSize()); 952 } 953 }); 954 } 955 956 public boolean isCacheInconsistent() { 957 return isCacheInconsistent.get(); 958 } 959 960 public void setCacheInconsistent(boolean setCacheInconsistent) { 961 isCacheInconsistent.set(setCacheInconsistent); 962 } 963 964 protected void setCacheState(CacheState state) { 965 cacheState = state; 966 } 967 968 /* 969 * Statistics thread. Periodically output cache statistics to the log. 970 */ 971 private static class StatisticsThread extends Thread { 972 private final BucketCache bucketCache; 973 974 public StatisticsThread(BucketCache bucketCache) { 975 super("BucketCacheStatsThread"); 976 setDaemon(true); 977 this.bucketCache = bucketCache; 978 } 979 980 @Override 981 public void run() { 982 bucketCache.logStats(); 983 } 984 } 985 986 public void logStats() { 987 if (!isCacheInitialized("BucketCache::logStats")) { 988 return; 989 } 990 991 long totalSize = bucketAllocator.getTotalSize(); 992 long usedSize = bucketAllocator.getUsedSize(); 993 long freeSize = totalSize - usedSize; 994 long cacheSize = getRealCacheSize(); 995 LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " + "totalSize=" 996 + StringUtils.byteDesc(totalSize) + ", " + "freeSize=" + StringUtils.byteDesc(freeSize) + ", " 997 + "usedSize=" + StringUtils.byteDesc(usedSize) + ", " + "cacheSize=" 998 + StringUtils.byteDesc(cacheSize) + ", " + "accesses=" + cacheStats.getRequestCount() + ", " 999 + "hits=" + cacheStats.getHitCount() + ", " + "IOhitsPerSecond=" 1000 + cacheStats.getIOHitsPerSecond() + ", " + "IOTimePerHit=" 1001 + String.format("%.2f", cacheStats.getIOTimePerHit()) + ", " + "hitRatio=" 1002 + (cacheStats.getHitCount() == 0 1003 ? "0," 1004 : (StringUtils.formatPercent(cacheStats.getHitRatio(), 2) + ", ")) 1005 + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits=" 1006 + cacheStats.getHitCachingCount() + ", " + "cachingHitsRatio=" 1007 + (cacheStats.getHitCachingCount() == 0 1008 ? "0," 1009 : (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2) + ", ")) 1010 + "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted=" 1011 + cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + cacheStats.evictedPerEviction() 1012 + ", " + "allocationFailCount=" + cacheStats.getAllocationFailCount() + ", blocksCount=" 1013 + backingMap.size()); 1014 cacheStats.reset(); 1015 1016 bucketAllocator.logDebugStatistics(); 1017 } 1018 1019 public long getRealCacheSize() { 1020 return this.realCacheSize.sum(); 1021 } 1022 1023 public long acceptableSize() { 1024 if (!isCacheInitialized("BucketCache::acceptableSize")) { 1025 return 0; 1026 } 1027 return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor); 1028 } 1029 1030 long getPartitionSize(float partitionFactor) { 1031 if (!isCacheInitialized("BucketCache::getPartitionSize")) { 1032 return 0; 1033 } 1034 1035 return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor); 1036 } 1037 1038 /** 1039 * Return the count of bucketSizeinfos still need free space 1040 */ 1041 private int bucketSizesAboveThresholdCount(float minFactor) { 1042 if (!isCacheInitialized("BucketCache::bucketSizesAboveThresholdCount")) { 1043 return 0; 1044 } 1045 1046 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 1047 int fullCount = 0; 1048 for (int i = 0; i < stats.length; i++) { 1049 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 1050 freeGoal = Math.max(freeGoal, 1); 1051 if (stats[i].freeCount() < freeGoal) { 1052 fullCount++; 1053 } 1054 } 1055 return fullCount; 1056 } 1057 1058 /** 1059 * This method will find the buckets that are minimally occupied and are not reference counted and 1060 * will free them completely without any constraint on the access times of the elements, and as a 1061 * process will completely free at most the number of buckets passed, sometimes it might not due 1062 * to changing refCounts 1063 * @param completelyFreeBucketsNeeded number of buckets to free 1064 **/ 1065 private void freeEntireBuckets(int completelyFreeBucketsNeeded) { 1066 if (!isCacheInitialized("BucketCache::freeEntireBuckets")) { 1067 return; 1068 } 1069 1070 if (completelyFreeBucketsNeeded != 0) { 1071 // First we will build a set where the offsets are reference counted, usually 1072 // this set is small around O(Handler Count) unless something else is wrong 1073 Set<Integer> inUseBuckets = new HashSet<>(); 1074 backingMap.forEach((k, be) -> { 1075 if (be.isRpcRef()) { 1076 inUseBuckets.add(bucketAllocator.getBucketIndex(be.offset())); 1077 } 1078 }); 1079 Set<Integer> candidateBuckets = 1080 bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded); 1081 for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) { 1082 if (candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset()))) { 1083 evictBucketEntryIfNoRpcReferenced(entry.getKey(), entry.getValue()); 1084 } 1085 } 1086 } 1087 } 1088 1089 private long calculateBytesToFree(StringBuilder msgBuffer) { 1090 long bytesToFreeWithoutExtra = 0; 1091 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 1092 long[] bytesToFreeForBucket = new long[stats.length]; 1093 for (int i = 0; i < stats.length; i++) { 1094 bytesToFreeForBucket[i] = 0; 1095 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 1096 freeGoal = Math.max(freeGoal, 1); 1097 if (stats[i].freeCount() < freeGoal) { 1098 bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount()); 1099 bytesToFreeWithoutExtra += bytesToFreeForBucket[i]; 1100 if (msgBuffer != null) { 1101 msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" 1102 + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", "); 1103 } 1104 } 1105 } 1106 if (msgBuffer != null) { 1107 msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); 1108 } 1109 return bytesToFreeWithoutExtra; 1110 } 1111 1112 /** 1113 * Free the space if the used size reaches acceptableSize() or one size block couldn't be 1114 * allocated. When freeing the space, we use the LRU algorithm and ensure there must be some 1115 * blocks evicted 1116 * @param why Why we are being called 1117 */ 1118 void freeSpace(final String why) { 1119 if (!isCacheInitialized("BucketCache::freeSpace")) { 1120 return; 1121 } 1122 // Ensure only one freeSpace progress at a time 1123 if (!freeSpaceLock.tryLock()) { 1124 return; 1125 } 1126 try { 1127 freeInProgress = true; 1128 StringBuilder msgBuffer = LOG.isDebugEnabled() ? new StringBuilder() : null; 1129 long bytesToFreeWithoutExtra = calculateBytesToFree(msgBuffer); 1130 if (bytesToFreeWithoutExtra <= 0) { 1131 return; 1132 } 1133 long currentSize = bucketAllocator.getUsedSize(); 1134 long totalSize = bucketAllocator.getTotalSize(); 1135 if (LOG.isDebugEnabled() && msgBuffer != null) { 1136 LOG.debug("Free started because \"" + why + "\"; " + msgBuffer + " of current used=" 1137 + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" 1138 + StringUtils.byteDesc(realCacheSize.sum()) + ", total=" 1139 + StringUtils.byteDesc(totalSize)); 1140 } 1141 long bytesToFreeWithExtra = 1142 (long) Math.floor(bytesToFreeWithoutExtra * (1 + extraFreeFactor)); 1143 // Instantiate priority buckets 1144 BucketEntryGroup bucketSingle = 1145 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(singleFactor)); 1146 BucketEntryGroup bucketMulti = 1147 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(multiFactor)); 1148 BucketEntryGroup bucketMemory = 1149 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(memoryFactor)); 1150 1151 Set<String> allValidFiles = null; 1152 // We need the region/stores/files tree, in order to figure out if a block is "orphan" or not. 1153 // See further comments below for more details. 1154 if (onlineRegions != null) { 1155 allValidFiles = BlockCacheUtil.listAllFilesNames(onlineRegions); 1156 } 1157 // the cached time is recored in nanos, so we need to convert the grace period accordingly 1158 long orphanGracePeriodNanos = orphanBlockGracePeriod * 1000000; 1159 long bytesFreed = 0; 1160 // Check the list of files to determine the cold files which can be readily evicted. 1161 Map<String, String> coldFiles = null; 1162 1163 DataTieringManager dataTieringManager = DataTieringManager.getInstance(); 1164 if (dataTieringManager != null) { 1165 coldFiles = dataTieringManager.getColdFilesList(); 1166 } 1167 // Scan entire map putting bucket entry into appropriate bucket entry 1168 // group 1169 for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) { 1170 BlockCacheKey key = bucketEntryWithKey.getKey(); 1171 BucketEntry entry = bucketEntryWithKey.getValue(); 1172 // Under certain conditions, blocks for regions not on the current region server might 1173 // be hanging on the cache. For example, when using the persistent cache feature, if the 1174 // RS crashes, then if not the same regions are assigned back once its online again, blocks 1175 // for the previous online regions would be recovered and stay in the cache. These would be 1176 // "orphan" blocks, as the files these blocks belong to are not in any of the online 1177 // regions. 1178 // "Orphan" blocks are a pure waste of cache space and should be evicted first during 1179 // the freespace run. 1180 // Compactions and Flushes may cache blocks before its files are completely written. In 1181 // these cases the file won't be found in any of the online regions stores, but the block 1182 // shouldn't be evicted. To avoid this, we defined this 1183 // hbase.bucketcache.block.orphan.evictgraceperiod property, to account for a grace 1184 // period (default 24 hours) where a block should be checked if it's an orphan block. 1185 if ( 1186 allValidFiles != null 1187 && entry.getCachedTime() < (System.nanoTime() - orphanGracePeriodNanos) 1188 ) { 1189 if (!allValidFiles.contains(key.getHfileName())) { 1190 if (evictBucketEntryIfNoRpcReferenced(key, entry)) { 1191 // We calculate the freed bytes, but we don't stop if the goal was reached because 1192 // these are orphan blocks anyway, so let's leverage this run of freeSpace 1193 // to get rid of all orphans at once. 1194 bytesFreed += entry.getLength(); 1195 continue; 1196 } 1197 } 1198 } 1199 1200 if ( 1201 bytesFreed < bytesToFreeWithExtra && coldFiles != null 1202 && coldFiles.containsKey(bucketEntryWithKey.getKey().getHfileName()) 1203 ) { 1204 int freedBlockSize = bucketEntryWithKey.getValue().getLength(); 1205 if (evictBlockIfNoRpcReferenced(bucketEntryWithKey.getKey())) { 1206 bytesFreed += freedBlockSize; 1207 } 1208 continue; 1209 } 1210 1211 switch (entry.getPriority()) { 1212 case SINGLE: { 1213 bucketSingle.add(bucketEntryWithKey); 1214 break; 1215 } 1216 case MULTI: { 1217 bucketMulti.add(bucketEntryWithKey); 1218 break; 1219 } 1220 case MEMORY: { 1221 bucketMemory.add(bucketEntryWithKey); 1222 break; 1223 } 1224 } 1225 } 1226 1227 // Check if the cold file eviction is sufficient to create enough space. 1228 bytesToFreeWithExtra -= bytesFreed; 1229 if (bytesToFreeWithExtra <= 0) { 1230 LOG.debug("Bucket cache free space completed; freed space : {} bytes of cold data blocks.", 1231 StringUtils.byteDesc(bytesFreed)); 1232 return; 1233 } 1234 1235 if (LOG.isDebugEnabled()) { 1236 LOG.debug( 1237 "Bucket cache free space completed; freed space : {} " 1238 + "bytes of cold data blocks. {} more bytes required to be freed.", 1239 StringUtils.byteDesc(bytesFreed), bytesToFreeWithExtra); 1240 } 1241 1242 PriorityQueue<BucketEntryGroup> bucketQueue = 1243 new PriorityQueue<>(3, Comparator.comparingLong(BucketEntryGroup::overflow)); 1244 1245 bucketQueue.add(bucketSingle); 1246 bucketQueue.add(bucketMulti); 1247 bucketQueue.add(bucketMemory); 1248 1249 int remainingBuckets = bucketQueue.size(); 1250 BucketEntryGroup bucketGroup; 1251 while ((bucketGroup = bucketQueue.poll()) != null) { 1252 long overflow = bucketGroup.overflow(); 1253 if (overflow > 0) { 1254 long bucketBytesToFree = 1255 Math.min(overflow, (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets); 1256 bytesFreed += bucketGroup.free(bucketBytesToFree); 1257 } 1258 remainingBuckets--; 1259 } 1260 1261 // Check and free if there are buckets that still need freeing of space 1262 if (bucketSizesAboveThresholdCount(minFactor) > 0) { 1263 bucketQueue.clear(); 1264 remainingBuckets = 3; 1265 bucketQueue.add(bucketSingle); 1266 bucketQueue.add(bucketMulti); 1267 bucketQueue.add(bucketMemory); 1268 while ((bucketGroup = bucketQueue.poll()) != null) { 1269 long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets; 1270 bytesFreed += bucketGroup.free(bucketBytesToFree); 1271 remainingBuckets--; 1272 } 1273 } 1274 // Even after the above free we might still need freeing because of the 1275 // De-fragmentation of the buckets (also called Slab Calcification problem), i.e 1276 // there might be some buckets where the occupancy is very sparse and thus are not 1277 // yielding the free for the other bucket sizes, the fix for this to evict some 1278 // of the buckets, we do this by evicting the buckets that are least fulled 1279 freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR * bucketSizesAboveThresholdCount(1.0f)); 1280 1281 if (LOG.isDebugEnabled()) { 1282 long single = bucketSingle.totalSize(); 1283 long multi = bucketMulti.totalSize(); 1284 long memory = bucketMemory.totalSize(); 1285 if (LOG.isDebugEnabled()) { 1286 LOG.debug("Bucket cache free space completed; " + "freed=" 1287 + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + StringUtils.byteDesc(totalSize) 1288 + ", " + "single=" + StringUtils.byteDesc(single) + ", " + "multi=" 1289 + StringUtils.byteDesc(multi) + ", " + "memory=" + StringUtils.byteDesc(memory)); 1290 } 1291 } 1292 } catch (Throwable t) { 1293 LOG.warn("Failed freeing space", t); 1294 } finally { 1295 cacheStats.evict(); 1296 freeInProgress = false; 1297 freeSpaceLock.unlock(); 1298 } 1299 } 1300 1301 // This handles flushing the RAM cache to IOEngine. 1302 class WriterThread extends Thread { 1303 private final BlockingQueue<RAMQueueEntry> inputQueue; 1304 private volatile boolean writerEnabled = true; 1305 private final ByteBuffer metaBuff = ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE); 1306 1307 WriterThread(BlockingQueue<RAMQueueEntry> queue) { 1308 super("BucketCacheWriterThread"); 1309 this.inputQueue = queue; 1310 } 1311 1312 // Used for test 1313 void disableWriter() { 1314 this.writerEnabled = false; 1315 } 1316 1317 @Override 1318 public void run() { 1319 List<RAMQueueEntry> entries = new ArrayList<>(); 1320 try { 1321 while (isCacheEnabled() && writerEnabled) { 1322 try { 1323 try { 1324 // Blocks 1325 entries = getRAMQueueEntries(inputQueue, entries); 1326 } catch (InterruptedException ie) { 1327 if (!isCacheEnabled() || !writerEnabled) { 1328 break; 1329 } 1330 } 1331 doDrain(entries, metaBuff); 1332 } catch (Exception ioe) { 1333 LOG.error("WriterThread encountered error", ioe); 1334 } 1335 } 1336 } catch (Throwable t) { 1337 LOG.warn("Failed doing drain", t); 1338 } 1339 LOG.info(this.getName() + " exiting, cacheEnabled=" + isCacheEnabled()); 1340 } 1341 } 1342 1343 /** 1344 * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing 1345 * cache with a new block for the same cache key. there's a corner case: one thread cache a block 1346 * in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another new block 1347 * with the same cache key do the same thing for the same cache key, so if not evict the previous 1348 * bucket entry, then memory leak happen because the previous bucketEntry is gone but the 1349 * bucketAllocator do not free its memory. 1350 * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey 1351 * cacheKey, Cacheable newBlock) 1352 * @param key Block cache key 1353 * @param bucketEntry Bucket entry to put into backingMap. 1354 */ 1355 protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { 1356 BucketEntry previousEntry = backingMap.put(key, bucketEntry); 1357 blocksByHFile.add(key); 1358 updateRegionCachedSize(key, bucketEntry.getLength()); 1359 if (previousEntry != null && previousEntry != bucketEntry) { 1360 previousEntry.withWriteLock(offsetLock, () -> { 1361 blockEvicted(key, previousEntry, false, false); 1362 return null; 1363 }); 1364 } 1365 } 1366 1367 /** 1368 * Prepare and return a warning message for Bucket Allocator Exception 1369 * @param fle The exception 1370 * @param re The RAMQueueEntry for which the exception was thrown. 1371 * @return A warning message created from the input RAMQueueEntry object. 1372 */ 1373 private static String getAllocationFailWarningMessage(final BucketAllocatorException fle, 1374 final RAMQueueEntry re) { 1375 final StringBuilder sb = new StringBuilder(); 1376 sb.append("Most recent failed allocation after "); 1377 sb.append(ALLOCATION_FAIL_LOG_TIME_PERIOD); 1378 sb.append(" ms;"); 1379 if (re != null) { 1380 if (re.getData() instanceof HFileBlock) { 1381 final HFileContext fileContext = ((HFileBlock) re.getData()).getHFileContext(); 1382 final String columnFamily = Bytes.toString(fileContext.getColumnFamily()); 1383 final String tableName = Bytes.toString(fileContext.getTableName()); 1384 if (tableName != null) { 1385 sb.append(" Table: "); 1386 sb.append(tableName); 1387 } 1388 if (columnFamily != null) { 1389 sb.append(" CF: "); 1390 sb.append(columnFamily); 1391 } 1392 sb.append(" HFile: "); 1393 if (fileContext.getHFileName() != null) { 1394 sb.append(fileContext.getHFileName()); 1395 } else { 1396 sb.append(re.getKey()); 1397 } 1398 } else { 1399 sb.append(" HFile: "); 1400 sb.append(re.getKey()); 1401 } 1402 } 1403 sb.append(" Message: "); 1404 sb.append(fle.getMessage()); 1405 return sb.toString(); 1406 } 1407 1408 /** 1409 * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. Process all that 1410 * are passed in even if failure being sure to remove from ramCache else we'll never undo the 1411 * references and we'll OOME. 1412 * @param entries Presumes list passed in here will be processed by this invocation only. No 1413 * interference expected. 1414 */ 1415 void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException { 1416 if (entries.isEmpty()) { 1417 return; 1418 } 1419 // This method is a little hard to follow. We run through the passed in entries and for each 1420 // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must 1421 // do cleanup making sure we've cleared ramCache of all entries regardless of whether we 1422 // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by 1423 // filling ramCache. We do the clean up by again running through the passed in entries 1424 // doing extra work when we find a non-null bucketEntries corresponding entry. 1425 final int size = entries.size(); 1426 BucketEntry[] bucketEntries = new BucketEntry[size]; 1427 // Index updated inside loop if success or if we can't succeed. We retry if cache is full 1428 // when we go to add an entry by going around the loop again without upping the index. 1429 int index = 0; 1430 while (isCacheEnabled() && index < size) { 1431 RAMQueueEntry re = null; 1432 try { 1433 re = entries.get(index); 1434 if (re == null) { 1435 LOG.warn("Couldn't get entry or changed on us; who else is messing with it?"); 1436 index++; 1437 continue; 1438 } 1439 // Reset the position for reuse. 1440 // It should be guaranteed that the data in the metaBuff has been transferred to the 1441 // ioEngine safely. Otherwise, this reuse is problematic. Fortunately, the data is already 1442 // transferred with our current IOEngines. Should take care, when we have new kinds of 1443 // IOEngine in the future. 1444 metaBuff.clear(); 1445 BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize, 1446 this::createRecycler, metaBuff, acceptableSize()); 1447 // Successfully added. Up index and add bucketEntry. Clear io exceptions. 1448 bucketEntries[index] = bucketEntry; 1449 if (ioErrorStartTime > 0) { 1450 ioErrorStartTime = -1; 1451 } 1452 index++; 1453 } catch (BucketAllocatorException fle) { 1454 long currTs = EnvironmentEdgeManager.currentTime(); 1455 cacheStats.allocationFailed(); // Record the warning. 1456 if ( 1457 allocFailLogPrevTs == 0 || (currTs - allocFailLogPrevTs) > ALLOCATION_FAIL_LOG_TIME_PERIOD 1458 ) { 1459 LOG.warn(getAllocationFailWarningMessage(fle, re)); 1460 allocFailLogPrevTs = currTs; 1461 } 1462 // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below. 1463 bucketEntries[index] = null; 1464 index++; 1465 } catch (CacheFullException cfe) { 1466 // Cache full when we tried to add. Try freeing space and then retrying (don't up index) 1467 if (!freeInProgress && !re.isPrefetch()) { 1468 freeSpace("Full!"); 1469 } else if (re.isPrefetch()) { 1470 bucketEntries[index] = null; 1471 index++; 1472 } else { 1473 Thread.sleep(50); 1474 } 1475 } catch (IOException ioex) { 1476 // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem. 1477 LOG.error("Failed writing to bucket cache", ioex); 1478 checkIOErrorIsTolerated(); 1479 } 1480 } 1481 1482 // Make sure data pages are written on media before we update maps. 1483 try { 1484 ioEngine.sync(); 1485 } catch (IOException ioex) { 1486 LOG.error("Failed syncing IO engine", ioex); 1487 checkIOErrorIsTolerated(); 1488 // Since we failed sync, free the blocks in bucket allocator 1489 for (int i = 0; i < entries.size(); ++i) { 1490 BucketEntry bucketEntry = bucketEntries[i]; 1491 if (bucketEntry != null) { 1492 bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength()); 1493 bucketEntries[i] = null; 1494 } 1495 } 1496 } 1497 1498 // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if 1499 // success or error. 1500 for (int i = 0; i < size; ++i) { 1501 BlockCacheKey key = entries.get(i).getKey(); 1502 // Only add if non-null entry. 1503 if (bucketEntries[i] != null) { 1504 putIntoBackingMap(key, bucketEntries[i]); 1505 if (ioEngine.isPersistent()) { 1506 setCacheInconsistent(true); 1507 } 1508 } 1509 // Always remove from ramCache even if we failed adding it to the block cache above. 1510 boolean existed = ramCache.remove(key, re -> { 1511 if (re != null) { 1512 heapSize.add(-1 * re.getData().heapSize()); 1513 } 1514 }); 1515 if (!existed && bucketEntries[i] != null) { 1516 // Block should have already been evicted. Remove it and free space. 1517 final BucketEntry bucketEntry = bucketEntries[i]; 1518 bucketEntry.withWriteLock(offsetLock, () -> { 1519 if (backingMap.remove(key, bucketEntry)) { 1520 blockEvicted(key, bucketEntry, false, false); 1521 } 1522 return null; 1523 }); 1524 } 1525 long used = bucketAllocator.getUsedSize(); 1526 if (!entries.get(i).isPrefetch() && used > acceptableSize()) { 1527 LOG.debug("Calling freeSpace for block: {}", entries.get(i).getKey()); 1528 freeSpace("Used=" + used + " > acceptable=" + acceptableSize()); 1529 } 1530 } 1531 1532 } 1533 1534 /** 1535 * Blocks until elements available in {@code q} then tries to grab as many as possible before 1536 * returning. 1537 * @param receptacle Where to stash the elements taken from queue. We clear before we use it just 1538 * in case. 1539 * @param q The queue to take from. 1540 * @return {@code receptacle} laden with elements taken from the queue or empty if none found. 1541 */ 1542 static List<RAMQueueEntry> getRAMQueueEntries(BlockingQueue<RAMQueueEntry> q, 1543 List<RAMQueueEntry> receptacle) throws InterruptedException { 1544 // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it 1545 // ok even if list grew to accommodate thousands. 1546 receptacle.clear(); 1547 receptacle.add(q.take()); 1548 q.drainTo(receptacle); 1549 return receptacle; 1550 } 1551 1552 /** 1553 * @see #retrieveFromFile(int[]) 1554 */ 1555 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION", 1556 justification = "false positive, try-with-resources ensures close is called.") 1557 void persistToFile() throws IOException { 1558 LOG.debug("Thread {} started persisting bucket cache to file", 1559 Thread.currentThread().getName()); 1560 if (!isCachePersistent()) { 1561 throw new IOException("Attempt to persist non-persistent cache mappings!"); 1562 } 1563 File tempPersistencePath = new File(persistencePath + EnvironmentEdgeManager.currentTime()); 1564 try (FileOutputStream fos = new FileOutputStream(tempPersistencePath, false)) { 1565 LOG.debug("Persist in new chunked persistence format."); 1566 1567 persistChunkedBackingMap(fos); 1568 1569 LOG.debug( 1570 "PersistToFile: after persisting backing map size: {}, fullycachedFiles size: {}," 1571 + " file name: {}", 1572 backingMap.size(), fullyCachedFiles.size(), tempPersistencePath.getName()); 1573 } catch (IOException e) { 1574 LOG.error("Failed to persist bucket cache to file", e); 1575 throw e; 1576 } catch (Throwable e) { 1577 LOG.error("Failed during persist bucket cache to file: ", e); 1578 throw e; 1579 } 1580 LOG.debug("Thread {} finished persisting bucket cache to file, renaming", 1581 Thread.currentThread().getName()); 1582 if (!tempPersistencePath.renameTo(new File(persistencePath))) { 1583 LOG.warn("Failed to commit cache persistent file. We might lose cached blocks if " 1584 + "RS crashes/restarts before we successfully checkpoint again."); 1585 } 1586 } 1587 1588 public boolean isCachePersistent() { 1589 return ioEngine.isPersistent() && persistencePath != null; 1590 } 1591 1592 @Override 1593 public Optional<Map<String, Long>> getRegionCachedInfo() { 1594 return Optional.of(Collections.unmodifiableMap(regionCachedSize)); 1595 } 1596 1597 /** 1598 * @see #persistToFile() 1599 */ 1600 private void retrieveFromFile(int[] bucketSizes) throws IOException { 1601 LOG.info("Started retrieving bucket cache from file"); 1602 File persistenceFile = new File(persistencePath); 1603 if (!persistenceFile.exists()) { 1604 LOG.warn("Persistence file missing! " 1605 + "It's ok if it's first run after enabling persistent cache."); 1606 bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize); 1607 blockNumber.add(backingMap.size()); 1608 backingMapValidated.set(true); 1609 return; 1610 } 1611 assert !isCacheEnabled(); 1612 1613 try (FileInputStream in = new FileInputStream(persistenceFile)) { 1614 int pblen = ProtobufMagic.lengthOfPBMagic(); 1615 byte[] pbuf = new byte[pblen]; 1616 IOUtils.readFully(in, pbuf, 0, pblen); 1617 if (ProtobufMagic.isPBMagicPrefix(pbuf)) { 1618 LOG.info("Reading old format of persistence."); 1619 // The old non-chunked version of backing map persistence. 1620 parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in)); 1621 } else if (Arrays.equals(pbuf, BucketProtoUtils.PB_MAGIC_V2)) { 1622 // The new persistence format of chunked persistence. 1623 LOG.info("Reading new chunked format of persistence."); 1624 retrieveChunkedBackingMap(in); 1625 } else { 1626 // In 3.0 we have enough flexibility to dump the old cache data. 1627 // TODO: In 2.x line, this might need to be filled in to support reading the old format 1628 throw new IOException( 1629 "Persistence file does not start with protobuf magic number. " + persistencePath); 1630 } 1631 bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize); 1632 blockNumber.add(backingMap.size()); 1633 LOG.info("Bucket cache retrieved from file successfully with size: {}", backingMap.size()); 1634 } 1635 } 1636 1637 private void updateRegionSizeMapWhileRetrievingFromFile() { 1638 // Update the regionCachedSize with the region size while restarting the region server 1639 if (LOG.isDebugEnabled()) { 1640 LOG.debug("Updating region size map after retrieving cached file list"); 1641 dumpPrefetchList(); 1642 } 1643 regionCachedSize.clear(); 1644 fullyCachedFiles.forEach((hFileName, hFileSize) -> { 1645 // Get the region name for each file 1646 String regionEncodedName = hFileSize.getFirst(); 1647 long cachedFileSize = hFileSize.getSecond(); 1648 regionCachedSize.merge(regionEncodedName, cachedFileSize, 1649 (oldpf, fileSize) -> oldpf + fileSize); 1650 }); 1651 } 1652 1653 private void dumpPrefetchList() { 1654 for (Map.Entry<String, Pair<String, Long>> outerEntry : fullyCachedFiles.entrySet()) { 1655 LOG.debug("Cached File Entry:<{},<{},{}>>", outerEntry.getKey(), 1656 outerEntry.getValue().getFirst(), outerEntry.getValue().getSecond()); 1657 } 1658 } 1659 1660 private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass) 1661 throws IOException { 1662 if (capacitySize != cacheCapacity) { 1663 throw new IOException("Mismatched cache capacity:" + StringUtils.byteDesc(capacitySize) 1664 + ", expected: " + StringUtils.byteDesc(cacheCapacity)); 1665 } 1666 if (!ioEngine.getClass().getName().equals(ioclass)) { 1667 throw new IOException("Class name for IO engine mismatch: " + ioclass + ", expected:" 1668 + ioEngine.getClass().getName()); 1669 } 1670 if (!backingMap.getClass().getName().equals(mapclass)) { 1671 throw new IOException("Class name for cache map mismatch: " + mapclass + ", expected:" 1672 + backingMap.getClass().getName()); 1673 } 1674 } 1675 1676 private void verifyFileIntegrity(BucketCacheProtos.BucketCacheEntry proto) { 1677 try { 1678 if (proto.hasChecksum()) { 1679 ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(), 1680 algorithm); 1681 } 1682 backingMapValidated.set(true); 1683 } catch (IOException e) { 1684 LOG.warn("Checksum for cache file failed. " 1685 + "We need to validate each cache key in the backing map. " 1686 + "This may take some time, so we'll do it in a background thread,"); 1687 1688 Runnable cacheValidator = () -> { 1689 while (bucketAllocator == null) { 1690 try { 1691 Thread.sleep(50); 1692 } catch (InterruptedException ex) { 1693 throw new RuntimeException(ex); 1694 } 1695 } 1696 long startTime = EnvironmentEdgeManager.currentTime(); 1697 int totalKeysOriginally = backingMap.size(); 1698 for (Map.Entry<BlockCacheKey, BucketEntry> keyEntry : backingMap.entrySet()) { 1699 try { 1700 ((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue()); 1701 } catch (IOException e1) { 1702 LOG.debug("Check for key {} failed. Evicting.", keyEntry.getKey()); 1703 evictBlock(keyEntry.getKey()); 1704 fileNotFullyCached(keyEntry.getKey(), keyEntry.getValue()); 1705 } 1706 } 1707 backingMapValidated.set(true); 1708 LOG.info("Finished validating {} keys in the backing map. Recovered: {}. This took {}ms.", 1709 totalKeysOriginally, backingMap.size(), 1710 (EnvironmentEdgeManager.currentTime() - startTime)); 1711 }; 1712 Thread t = new Thread(cacheValidator); 1713 t.setDaemon(true); 1714 t.start(); 1715 } 1716 } 1717 1718 private void updateCacheIndex(BucketCacheProtos.BackingMap chunk, 1719 java.util.Map<java.lang.Integer, java.lang.String> deserializer) throws IOException { 1720 Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair2 = 1721 BucketProtoUtils.fromPB(deserializer, chunk, this::createRecycler); 1722 backingMap.putAll(pair2.getFirst()); 1723 blocksByHFile.addAll(pair2.getSecond()); 1724 } 1725 1726 private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException { 1727 Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair = 1728 BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(), 1729 this::createRecycler); 1730 backingMap = pair.getFirst(); 1731 blocksByHFile = pair.getSecond(); 1732 fullyCachedFiles.clear(); 1733 fullyCachedFiles.putAll(BucketProtoUtils.fromPB(proto.getCachedFilesMap())); 1734 1735 LOG.info("After retrieval Backing map size: {}, fullyCachedFiles size: {}", backingMap.size(), 1736 fullyCachedFiles.size()); 1737 1738 verifyFileIntegrity(proto); 1739 updateRegionSizeMapWhileRetrievingFromFile(); 1740 verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass()); 1741 } 1742 1743 private void persistChunkedBackingMap(FileOutputStream fos) throws IOException { 1744 LOG.debug( 1745 "persistToFile: before persisting backing map size: {}, " 1746 + "fullycachedFiles size: {}, chunkSize: {}", 1747 backingMap.size(), fullyCachedFiles.size(), persistenceChunkSize); 1748 1749 BucketProtoUtils.serializeAsPB(this, fos, persistenceChunkSize); 1750 1751 LOG.debug( 1752 "persistToFile: after persisting backing map size: {}, " + "fullycachedFiles size: {}", 1753 backingMap.size(), fullyCachedFiles.size()); 1754 } 1755 1756 private void retrieveChunkedBackingMap(FileInputStream in) throws IOException { 1757 // Read the first chunk that has all the details. 1758 BucketCacheProtos.BucketCacheEntry cacheEntry = 1759 BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in); 1760 1761 fullyCachedFiles.clear(); 1762 fullyCachedFiles.putAll(BucketProtoUtils.fromPB(cacheEntry.getCachedFilesMap())); 1763 1764 backingMap.clear(); 1765 blocksByHFile.clear(); 1766 1767 // Read the backing map entries in batches. 1768 int numChunks = 0; 1769 while (in.available() > 0) { 1770 updateCacheIndex(BucketCacheProtos.BackingMap.parseDelimitedFrom(in), 1771 cacheEntry.getDeserializersMap()); 1772 numChunks++; 1773 } 1774 1775 LOG.info("Retrieved {} of chunks with blockCount = {}.", numChunks, backingMap.size()); 1776 verifyFileIntegrity(cacheEntry); 1777 verifyCapacityAndClasses(cacheEntry.getCacheCapacity(), cacheEntry.getIoClass(), 1778 cacheEntry.getMapClass()); 1779 updateRegionSizeMapWhileRetrievingFromFile(); 1780 } 1781 1782 /** 1783 * Check whether we tolerate IO error this time. If the duration of IOEngine throwing errors 1784 * exceeds ioErrorsDurationTimeTolerated, we will disable the cache 1785 */ 1786 private void checkIOErrorIsTolerated() { 1787 long now = EnvironmentEdgeManager.currentTime(); 1788 // Do a single read to a local variable to avoid timing issue - HBASE-24454 1789 long ioErrorStartTimeTmp = this.ioErrorStartTime; 1790 if (ioErrorStartTimeTmp > 0) { 1791 if (isCacheEnabled() && (now - ioErrorStartTimeTmp) > this.ioErrorsTolerationDuration) { 1792 LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration 1793 + "ms, disabling cache, please check your IOEngine"); 1794 disableCache(); 1795 } 1796 } else { 1797 this.ioErrorStartTime = now; 1798 } 1799 } 1800 1801 /** 1802 * Used to shut down the cache -or- turn it off in the case of something broken. 1803 */ 1804 private void disableCache() { 1805 if (!isCacheEnabled()) { 1806 return; 1807 } 1808 LOG.info("Disabling cache"); 1809 cacheState = CacheState.DISABLED; 1810 ioEngine.shutdown(); 1811 this.scheduleThreadPool.shutdown(); 1812 for (int i = 0; i < writerThreads.length; ++i) 1813 writerThreads[i].interrupt(); 1814 this.ramCache.clear(); 1815 if (!ioEngine.isPersistent() || persistencePath == null) { 1816 // If persistent ioengine and a path, we will serialize out the backingMap. 1817 this.backingMap.clear(); 1818 this.blocksByHFile.clear(); 1819 this.fullyCachedFiles.clear(); 1820 this.regionCachedSize.clear(); 1821 } 1822 if (cacheStats.getMetricsRollerScheduler() != null) { 1823 cacheStats.getMetricsRollerScheduler().shutdownNow(); 1824 } 1825 } 1826 1827 private void join() throws InterruptedException { 1828 for (int i = 0; i < writerThreads.length; ++i) 1829 writerThreads[i].join(); 1830 } 1831 1832 @Override 1833 public void shutdown() { 1834 if (isCacheEnabled()) { 1835 disableCache(); 1836 LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() 1837 + "; path to write=" + persistencePath); 1838 if (ioEngine.isPersistent() && persistencePath != null) { 1839 try { 1840 join(); 1841 if (cachePersister != null) { 1842 LOG.info("Shutting down cache persister thread."); 1843 cachePersister.shutdown(); 1844 while (cachePersister.isAlive()) { 1845 Thread.sleep(10); 1846 } 1847 } 1848 persistToFile(); 1849 } catch (IOException ex) { 1850 LOG.error("Unable to persist data on exit: " + ex.toString(), ex); 1851 } catch (InterruptedException e) { 1852 LOG.warn("Failed to persist data on exit", e); 1853 } 1854 } 1855 } 1856 } 1857 1858 /** 1859 * Needed mostly for UTs that might run in the same VM and create different BucketCache instances 1860 * on different UT methods. 1861 */ 1862 @Override 1863 protected void finalize() { 1864 if (cachePersister != null && !cachePersister.isInterrupted()) { 1865 cachePersister.interrupt(); 1866 } 1867 } 1868 1869 @Override 1870 public CacheStats getStats() { 1871 return cacheStats; 1872 } 1873 1874 public BucketAllocator getAllocator() { 1875 return this.bucketAllocator; 1876 } 1877 1878 @Override 1879 public long heapSize() { 1880 return this.heapSize.sum(); 1881 } 1882 1883 @Override 1884 public long size() { 1885 return this.realCacheSize.sum(); 1886 } 1887 1888 @Override 1889 public long getCurrentDataSize() { 1890 return size(); 1891 } 1892 1893 @Override 1894 public long getFreeSize() { 1895 if (!isCacheInitialized("BucketCache:getFreeSize")) { 1896 return 0; 1897 } 1898 return this.bucketAllocator.getFreeSize(); 1899 } 1900 1901 @Override 1902 public long getBlockCount() { 1903 return this.blockNumber.sum(); 1904 } 1905 1906 @Override 1907 public long getDataBlockCount() { 1908 return getBlockCount(); 1909 } 1910 1911 @Override 1912 public long getCurrentSize() { 1913 if (!isCacheInitialized("BucketCache::getCurrentSize")) { 1914 return 0; 1915 } 1916 return this.bucketAllocator.getUsedSize(); 1917 } 1918 1919 protected String getAlgorithm() { 1920 return algorithm; 1921 } 1922 1923 /** 1924 * Evicts all blocks for a specific HFile. 1925 * <p> 1926 * This is used for evict-on-close to remove all blocks of a specific HFile. 1927 * @return the number of blocks evicted 1928 */ 1929 @Override 1930 public int evictBlocksByHfileName(String hfileName) { 1931 return evictBlocksRangeByHfileName(hfileName, 0, Long.MAX_VALUE); 1932 } 1933 1934 @Override 1935 public int evictBlocksRangeByHfileName(String hfileName, long initOffset, long endOffset) { 1936 Set<BlockCacheKey> keySet = getAllCacheKeysForFile(hfileName, initOffset, endOffset); 1937 // We need to make sure whether we are evicting all blocks for this given file. In case of 1938 // split references, we might be evicting just half of the blocks 1939 LOG.debug("found {} blocks for file {}, starting offset: {}, end offset: {}", keySet.size(), 1940 hfileName, initOffset, endOffset); 1941 int numEvicted = 0; 1942 for (BlockCacheKey key : keySet) { 1943 if (evictBlock(key)) { 1944 ++numEvicted; 1945 } 1946 } 1947 return numEvicted; 1948 } 1949 1950 private Set<BlockCacheKey> getAllCacheKeysForFile(String hfileName, long init, long end) { 1951 // These keys are just for comparison and are short lived, so we need only file name and offset 1952 return blocksByHFile.subSet(new BlockCacheKey(hfileName, init), true, 1953 new BlockCacheKey(hfileName, end), true); 1954 } 1955 1956 /** 1957 * Used to group bucket entries into priority buckets. There will be a BucketEntryGroup for each 1958 * priority (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate 1959 * number of elements out of each according to configuration parameters and their relative sizes. 1960 */ 1961 private class BucketEntryGroup { 1962 1963 private CachedEntryQueue queue; 1964 private long totalSize = 0; 1965 private long bucketSize; 1966 1967 public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) { 1968 this.bucketSize = bucketSize; 1969 queue = new CachedEntryQueue(bytesToFree, blockSize); 1970 totalSize = 0; 1971 } 1972 1973 public void add(Map.Entry<BlockCacheKey, BucketEntry> block) { 1974 totalSize += block.getValue().getLength(); 1975 queue.add(block); 1976 } 1977 1978 public long free(long toFree) { 1979 Map.Entry<BlockCacheKey, BucketEntry> entry; 1980 long freedBytes = 0; 1981 // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free 1982 // What to do then? Caching attempt fail? Need some changes in cacheBlock API? 1983 while ((entry = queue.pollLast()) != null) { 1984 BlockCacheKey blockCacheKey = entry.getKey(); 1985 BucketEntry be = entry.getValue(); 1986 if (evictBucketEntryIfNoRpcReferenced(blockCacheKey, be)) { 1987 freedBytes += be.getLength(); 1988 } 1989 if (freedBytes >= toFree) { 1990 return freedBytes; 1991 } 1992 } 1993 return freedBytes; 1994 } 1995 1996 public long overflow() { 1997 return totalSize - bucketSize; 1998 } 1999 2000 public long totalSize() { 2001 return totalSize; 2002 } 2003 } 2004 2005 /** 2006 * Block Entry stored in the memory with key,data and so on 2007 */ 2008 static class RAMQueueEntry { 2009 private final BlockCacheKey key; 2010 private final Cacheable data; 2011 private long accessCounter; 2012 private boolean inMemory; 2013 private boolean isCachePersistent; 2014 2015 private boolean isPrefetch; 2016 2017 RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory, 2018 boolean isCachePersistent, boolean isPrefetch) { 2019 this.key = bck; 2020 this.data = data; 2021 this.accessCounter = accessCounter; 2022 this.inMemory = inMemory; 2023 this.isCachePersistent = isCachePersistent; 2024 this.isPrefetch = isPrefetch; 2025 } 2026 2027 public Cacheable getData() { 2028 return data; 2029 } 2030 2031 public BlockCacheKey getKey() { 2032 return key; 2033 } 2034 2035 public boolean isPrefetch() { 2036 return isPrefetch; 2037 } 2038 2039 public void access(long accessCounter) { 2040 this.accessCounter = accessCounter; 2041 } 2042 2043 private ByteBuffAllocator getByteBuffAllocator() { 2044 if (data instanceof HFileBlock) { 2045 return ((HFileBlock) data).getByteBuffAllocator(); 2046 } 2047 return ByteBuffAllocator.HEAP; 2048 } 2049 2050 public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc, 2051 final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler, 2052 ByteBuffer metaBuff, final Long acceptableSize) throws IOException { 2053 int len = data.getSerializedLength(); 2054 // This cacheable thing can't be serialized 2055 if (len == 0) { 2056 return null; 2057 } 2058 if (isCachePersistent && data instanceof HFileBlock) { 2059 len += Long.BYTES; // we need to record the cache time for consistency check in case of 2060 // recovery 2061 } 2062 long offset = alloc.allocateBlock(len); 2063 // In the case of prefetch, we want to avoid freeSpace runs when the cache is full. 2064 // this makes the cache allocation more predictable, and is particularly important 2065 // when persistent cache is enabled, as it won't trigger evictions of the recovered blocks, 2066 // which are likely the most accessed and relevant blocks in the cache. 2067 if (isPrefetch() && alloc.getUsedSize() > acceptableSize) { 2068 alloc.freeBlock(offset, len); 2069 return null; 2070 } 2071 boolean succ = false; 2072 BucketEntry bucketEntry = null; 2073 try { 2074 int diskSizeWithHeader = (data instanceof HFileBlock) 2075 ? ((HFileBlock) data).getOnDiskSizeWithHeader() 2076 : data.getSerializedLength(); 2077 bucketEntry = new BucketEntry(offset, len, diskSizeWithHeader, accessCounter, inMemory, 2078 createRecycler, getByteBuffAllocator()); 2079 bucketEntry.setDeserializerReference(data.getDeserializer()); 2080 if (data instanceof HFileBlock) { 2081 // If an instance of HFileBlock, save on some allocations. 2082 HFileBlock block = (HFileBlock) data; 2083 ByteBuff sliceBuf = block.getBufferReadOnly(); 2084 block.getMetaData(metaBuff); 2085 // adds the cache time prior to the block and metadata part 2086 if (isCachePersistent) { 2087 ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); 2088 buffer.putLong(bucketEntry.getCachedTime()); 2089 buffer.rewind(); 2090 ioEngine.write(buffer, offset); 2091 ioEngine.write(sliceBuf, (offset + Long.BYTES)); 2092 } else { 2093 ioEngine.write(sliceBuf, offset); 2094 } 2095 ioEngine.write(metaBuff, offset + len - metaBuff.limit()); 2096 } else { 2097 // Only used for testing. 2098 ByteBuffer bb = ByteBuffer.allocate(len); 2099 data.serialize(bb, true); 2100 ioEngine.write(bb, offset); 2101 } 2102 succ = true; 2103 } finally { 2104 if (!succ) { 2105 alloc.freeBlock(offset, len); 2106 } 2107 } 2108 realCacheSize.add(len); 2109 return bucketEntry; 2110 } 2111 } 2112 2113 /** 2114 * Only used in test 2115 */ 2116 void stopWriterThreads() throws InterruptedException { 2117 for (WriterThread writerThread : writerThreads) { 2118 writerThread.disableWriter(); 2119 writerThread.interrupt(); 2120 writerThread.join(); 2121 } 2122 } 2123 2124 @Override 2125 public Iterator<CachedBlock> iterator() { 2126 // Don't bother with ramcache since stuff is in here only a little while. 2127 final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i = this.backingMap.entrySet().iterator(); 2128 return new Iterator<CachedBlock>() { 2129 private final long now = System.nanoTime(); 2130 2131 @Override 2132 public boolean hasNext() { 2133 return i.hasNext(); 2134 } 2135 2136 @Override 2137 public CachedBlock next() { 2138 final Map.Entry<BlockCacheKey, BucketEntry> e = i.next(); 2139 return new CachedBlock() { 2140 @Override 2141 public String toString() { 2142 return BlockCacheUtil.toString(this, now); 2143 } 2144 2145 @Override 2146 public BlockPriority getBlockPriority() { 2147 return e.getValue().getPriority(); 2148 } 2149 2150 @Override 2151 public BlockType getBlockType() { 2152 // Not held by BucketEntry. Could add it if wanted on BucketEntry creation. 2153 return null; 2154 } 2155 2156 @Override 2157 public long getOffset() { 2158 return e.getKey().getOffset(); 2159 } 2160 2161 @Override 2162 public long getSize() { 2163 return e.getValue().getLength(); 2164 } 2165 2166 @Override 2167 public long getCachedTime() { 2168 return e.getValue().getCachedTime(); 2169 } 2170 2171 @Override 2172 public String getFilename() { 2173 return e.getKey().getHfileName(); 2174 } 2175 2176 @Override 2177 public int compareTo(CachedBlock other) { 2178 int diff = this.getFilename().compareTo(other.getFilename()); 2179 if (diff != 0) return diff; 2180 2181 diff = Long.compare(this.getOffset(), other.getOffset()); 2182 if (diff != 0) return diff; 2183 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) { 2184 throw new IllegalStateException( 2185 "" + this.getCachedTime() + ", " + other.getCachedTime()); 2186 } 2187 return Long.compare(other.getCachedTime(), this.getCachedTime()); 2188 } 2189 2190 @Override 2191 public int hashCode() { 2192 return e.getKey().hashCode(); 2193 } 2194 2195 @Override 2196 public boolean equals(Object obj) { 2197 if (obj instanceof CachedBlock) { 2198 CachedBlock cb = (CachedBlock) obj; 2199 return compareTo(cb) == 0; 2200 } else { 2201 return false; 2202 } 2203 } 2204 }; 2205 } 2206 2207 @Override 2208 public void remove() { 2209 throw new UnsupportedOperationException(); 2210 } 2211 }; 2212 } 2213 2214 @Override 2215 public BlockCache[] getBlockCaches() { 2216 return null; 2217 } 2218 2219 public int getRpcRefCount(BlockCacheKey cacheKey) { 2220 BucketEntry bucketEntry = backingMap.get(cacheKey); 2221 if (bucketEntry != null) { 2222 return bucketEntry.refCnt() - (bucketEntry.markedAsEvicted.get() ? 0 : 1); 2223 } 2224 return 0; 2225 } 2226 2227 float getAcceptableFactor() { 2228 return acceptableFactor; 2229 } 2230 2231 float getMinFactor() { 2232 return minFactor; 2233 } 2234 2235 float getExtraFreeFactor() { 2236 return extraFreeFactor; 2237 } 2238 2239 float getSingleFactor() { 2240 return singleFactor; 2241 } 2242 2243 float getMultiFactor() { 2244 return multiFactor; 2245 } 2246 2247 float getMemoryFactor() { 2248 return memoryFactor; 2249 } 2250 2251 long getQueueAdditionWaitTime() { 2252 return queueAdditionWaitTime; 2253 } 2254 2255 long getPersistenceChunkSize() { 2256 return persistenceChunkSize; 2257 } 2258 2259 long getBucketcachePersistInterval() { 2260 return bucketcachePersistInterval; 2261 } 2262 2263 public String getPersistencePath() { 2264 return persistencePath; 2265 } 2266 2267 /** 2268 * Wrapped the delegate ConcurrentMap with maintaining its block's reference count. 2269 */ 2270 static class RAMCache { 2271 /** 2272 * Defined the map as {@link ConcurrentHashMap} explicitly here, because in 2273 * {@link RAMCache#get(BlockCacheKey)} and 2274 * {@link RAMCache#putIfAbsent(BlockCacheKey, BucketCache.RAMQueueEntry)} , we need to guarantee 2275 * the atomicity of map#computeIfPresent(key, func) and map#putIfAbsent(key, func). Besides, the 2276 * func method can execute exactly once only when the key is present(or absent) and under the 2277 * lock context. Otherwise, the reference count of block will be messed up. Notice that the 2278 * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that. 2279 */ 2280 final ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> delegate = new ConcurrentHashMap<>(); 2281 2282 public boolean containsKey(BlockCacheKey key) { 2283 return delegate.containsKey(key); 2284 } 2285 2286 public RAMQueueEntry get(BlockCacheKey key) { 2287 return delegate.computeIfPresent(key, (k, re) -> { 2288 // It'll be referenced by RPC, so retain atomically here. if the get and retain is not 2289 // atomic, another thread may remove and release the block, when retaining in this thread we 2290 // may retain a block with refCnt=0 which is disallowed. (see HBASE-22422) 2291 re.getData().retain(); 2292 return re; 2293 }); 2294 } 2295 2296 /** 2297 * Return the previous associated value, or null if absent. It has the same meaning as 2298 * {@link ConcurrentMap#putIfAbsent(Object, Object)} 2299 */ 2300 public RAMQueueEntry putIfAbsent(BlockCacheKey key, RAMQueueEntry entry) { 2301 AtomicBoolean absent = new AtomicBoolean(false); 2302 RAMQueueEntry re = delegate.computeIfAbsent(key, k -> { 2303 // The RAMCache reference to this entry, so reference count should be increment. 2304 entry.getData().retain(); 2305 absent.set(true); 2306 return entry; 2307 }); 2308 return absent.get() ? null : re; 2309 } 2310 2311 public boolean remove(BlockCacheKey key) { 2312 return remove(key, re -> { 2313 }); 2314 } 2315 2316 /** 2317 * Defined an {@link Consumer} here, because once the removed entry release its reference count, 2318 * then it's ByteBuffers may be recycled and accessing it outside this method will be thrown an 2319 * exception. the consumer will access entry to remove before release its reference count. 2320 * Notice, don't change its reference count in the {@link Consumer} 2321 */ 2322 public boolean remove(BlockCacheKey key, Consumer<RAMQueueEntry> action) { 2323 RAMQueueEntry previous = delegate.remove(key); 2324 action.accept(previous); 2325 if (previous != null) { 2326 previous.getData().release(); 2327 } 2328 return previous != null; 2329 } 2330 2331 public boolean isEmpty() { 2332 return delegate.isEmpty(); 2333 } 2334 2335 public void clear() { 2336 Iterator<Map.Entry<BlockCacheKey, RAMQueueEntry>> it = delegate.entrySet().iterator(); 2337 while (it.hasNext()) { 2338 RAMQueueEntry re = it.next().getValue(); 2339 it.remove(); 2340 re.getData().release(); 2341 } 2342 } 2343 2344 public boolean hasBlocksForFile(String fileName) { 2345 return delegate.keySet().stream().filter(key -> key.getHfileName().equals(fileName)) 2346 .findFirst().isPresent(); 2347 } 2348 } 2349 2350 public Map<BlockCacheKey, BucketEntry> getBackingMap() { 2351 return backingMap; 2352 } 2353 2354 public AtomicBoolean getBackingMapValidated() { 2355 return backingMapValidated; 2356 } 2357 2358 @Override 2359 public Optional<Map<String, Pair<String, Long>>> getFullyCachedFiles() { 2360 return Optional.of(fullyCachedFiles); 2361 } 2362 2363 public static Optional<BucketCache> getBucketCacheFromCacheConfig(CacheConfig cacheConf) { 2364 if (cacheConf.getBlockCache().isPresent()) { 2365 BlockCache bc = cacheConf.getBlockCache().get(); 2366 if (bc instanceof CombinedBlockCache) { 2367 BlockCache l2 = ((CombinedBlockCache) bc).getSecondLevelCache(); 2368 if (l2 instanceof BucketCache) { 2369 return Optional.of((BucketCache) l2); 2370 } 2371 } else if (bc instanceof BucketCache) { 2372 return Optional.of((BucketCache) bc); 2373 } 2374 } 2375 return Optional.empty(); 2376 } 2377 2378 @Override 2379 public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int dataBlockCount, 2380 long size) { 2381 // block eviction may be happening in the background as prefetch runs, 2382 // so we need to count all blocks for this file in the backing map under 2383 // a read lock for the block offset 2384 final List<ReentrantReadWriteLock> locks = new ArrayList<>(); 2385 LOG.debug("Notifying caching completed for file {}, with total blocks {}, and data blocks {}", 2386 fileName, totalBlockCount, dataBlockCount); 2387 try { 2388 final MutableInt count = new MutableInt(); 2389 LOG.debug("iterating over {} entries in the backing map", backingMap.size()); 2390 Set<BlockCacheKey> result = getAllCacheKeysForFile(fileName.getName(), 0, Long.MAX_VALUE); 2391 if (result.isEmpty() && StoreFileInfo.isReference(fileName)) { 2392 result = getAllCacheKeysForFile( 2393 StoreFileInfo.getReferredToRegionAndFile(fileName.getName()).getSecond(), 0, 2394 Long.MAX_VALUE); 2395 } 2396 result.stream().forEach(entry -> { 2397 LOG.debug("found block for file {} in the backing map. Acquiring read lock for offset {}", 2398 fileName.getName(), entry.getOffset()); 2399 ReentrantReadWriteLock lock = offsetLock.getLock(entry.getOffset()); 2400 lock.readLock().lock(); 2401 locks.add(lock); 2402 if (backingMap.containsKey(entry) && entry.getBlockType().isData()) { 2403 count.increment(); 2404 } 2405 }); 2406 // BucketCache would only have data blocks 2407 if (dataBlockCount == count.getValue()) { 2408 LOG.debug("File {} has now been fully cached.", fileName); 2409 fileCacheCompleted(fileName, size); 2410 } else { 2411 LOG.debug( 2412 "Prefetch executor completed for {}, but only {} data blocks were cached. " 2413 + "Total data blocks for file: {}. " 2414 + "Checking for blocks pending cache in cache writer queue.", 2415 fileName, count.getValue(), dataBlockCount); 2416 if (ramCache.hasBlocksForFile(fileName.getName())) { 2417 for (ReentrantReadWriteLock lock : locks) { 2418 lock.readLock().unlock(); 2419 } 2420 locks.clear(); 2421 LOG.debug("There are still blocks pending caching for file {}. Will sleep 100ms " 2422 + "and try the verification again.", fileName); 2423 Thread.sleep(100); 2424 notifyFileCachingCompleted(fileName, totalBlockCount, dataBlockCount, size); 2425 } else { 2426 LOG.info( 2427 "The total block count was {}. We found only {} data blocks cached from " 2428 + "a total of {} data blocks for file {}, " 2429 + "but no blocks pending caching. Maybe cache is full or evictions " 2430 + "happened concurrently to cache prefetch.", 2431 totalBlockCount, count, dataBlockCount, fileName); 2432 } 2433 } 2434 } catch (InterruptedException e) { 2435 throw new RuntimeException(e); 2436 } finally { 2437 for (ReentrantReadWriteLock lock : locks) { 2438 lock.readLock().unlock(); 2439 } 2440 } 2441 } 2442 2443 @Override 2444 public Optional<Boolean> blockFitsIntoTheCache(HFileBlock block) { 2445 if (!isCacheInitialized("blockFitsIntoTheCache")) { 2446 return Optional.of(false); 2447 } 2448 2449 long currentUsed = bucketAllocator.getUsedSize(); 2450 boolean result = (currentUsed + block.getOnDiskSizeWithHeader()) < acceptableSize(); 2451 return Optional.of(result); 2452 } 2453 2454 @Override 2455 public Optional<Boolean> shouldCacheFile(HFileInfo hFileInfo, Configuration conf) { 2456 String fileName = hFileInfo.getHFileContext().getHFileName(); 2457 DataTieringManager dataTieringManager = DataTieringManager.getInstance(); 2458 if (dataTieringManager != null && !dataTieringManager.isHotData(hFileInfo, conf)) { 2459 LOG.debug("Data tiering is enabled for file: '{}' and it is not hot data", fileName); 2460 return Optional.of(false); 2461 } 2462 // if we don't have the file in fullyCachedFiles, we should cache it 2463 return Optional.of(!fullyCachedFiles.containsKey(fileName)); 2464 } 2465 2466 @Override 2467 public Optional<Boolean> shouldCacheBlock(BlockCacheKey key, long maxTimestamp, 2468 Configuration conf) { 2469 DataTieringManager dataTieringManager = DataTieringManager.getInstance(); 2470 if (dataTieringManager != null && !dataTieringManager.isHotData(maxTimestamp, conf)) { 2471 LOG.debug("Data tiering is enabled for file: '{}' and it is not hot data", 2472 key.getHfileName()); 2473 return Optional.of(false); 2474 } 2475 return Optional.of(true); 2476 } 2477 2478 @Override 2479 public Optional<Boolean> isAlreadyCached(BlockCacheKey key) { 2480 boolean foundKey = backingMap.containsKey(key); 2481 // if there's no entry for the key itself, we need to check if this key is for a reference, 2482 // and if so, look for a block from the referenced file using this getBlockForReference method. 2483 return Optional.of(foundKey ? true : getBlockForReference(key) != null); 2484 } 2485 2486 @Override 2487 public Optional<Integer> getBlockSize(BlockCacheKey key) { 2488 BucketEntry entry = backingMap.get(key); 2489 if (entry == null) { 2490 // the key might be for a reference tha we had found the block from the referenced file in 2491 // the cache when we first tried to cache it. 2492 entry = getBlockForReference(key); 2493 return entry == null ? Optional.empty() : Optional.of(entry.getOnDiskSizeWithHeader()); 2494 } else { 2495 return Optional.of(entry.getOnDiskSizeWithHeader()); 2496 } 2497 2498 } 2499 2500 boolean isCacheInitialized(String api) { 2501 if (cacheState == CacheState.INITIALIZING) { 2502 LOG.warn("Bucket initialisation pending at {}", api); 2503 return false; 2504 } 2505 return true; 2506 } 2507 2508 @Override 2509 public boolean waitForCacheInitialization(long timeout) { 2510 try { 2511 while (cacheState == CacheState.INITIALIZING) { 2512 if (timeout <= 0) { 2513 break; 2514 } 2515 Thread.sleep(100); 2516 timeout -= 100; 2517 } 2518 } finally { 2519 return isCacheEnabled(); 2520 } 2521 } 2522}