001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; 021 022import java.io.File; 023import java.io.FileInputStream; 024import java.io.FileOutputStream; 025import java.io.IOException; 026import java.nio.ByteBuffer; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.Comparator; 031import java.util.HashSet; 032import java.util.Iterator; 033import java.util.List; 034import java.util.Map; 035import java.util.NavigableSet; 036import java.util.Optional; 037import java.util.PriorityQueue; 038import java.util.Set; 039import java.util.concurrent.ArrayBlockingQueue; 040import java.util.concurrent.BlockingQueue; 041import java.util.concurrent.ConcurrentHashMap; 042import java.util.concurrent.ConcurrentMap; 043import java.util.concurrent.ConcurrentSkipListSet; 044import java.util.concurrent.Executors; 045import java.util.concurrent.ScheduledExecutorService; 046import java.util.concurrent.TimeUnit; 047import java.util.concurrent.atomic.AtomicBoolean; 048import java.util.concurrent.atomic.AtomicLong; 049import java.util.concurrent.atomic.LongAdder; 050import java.util.concurrent.locks.Lock; 051import java.util.concurrent.locks.ReentrantLock; 052import java.util.concurrent.locks.ReentrantReadWriteLock; 053import java.util.function.Consumer; 054import java.util.function.Function; 055import org.apache.commons.io.IOUtils; 056import org.apache.commons.lang3.mutable.MutableInt; 057import org.apache.hadoop.conf.Configuration; 058import org.apache.hadoop.fs.Path; 059import org.apache.hadoop.hbase.HBaseConfiguration; 060import org.apache.hadoop.hbase.HBaseIOException; 061import org.apache.hadoop.hbase.TableName; 062import org.apache.hadoop.hbase.client.Admin; 063import org.apache.hadoop.hbase.io.ByteBuffAllocator; 064import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; 065import org.apache.hadoop.hbase.io.HeapSize; 066import org.apache.hadoop.hbase.io.hfile.BlockCache; 067import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 068import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil; 069import org.apache.hadoop.hbase.io.hfile.BlockPriority; 070import org.apache.hadoop.hbase.io.hfile.BlockType; 071import org.apache.hadoop.hbase.io.hfile.CacheConfig; 072import org.apache.hadoop.hbase.io.hfile.CacheStats; 073import org.apache.hadoop.hbase.io.hfile.Cacheable; 074import org.apache.hadoop.hbase.io.hfile.CachedBlock; 075import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; 076import org.apache.hadoop.hbase.io.hfile.HFileBlock; 077import org.apache.hadoop.hbase.io.hfile.HFileContext; 078import org.apache.hadoop.hbase.nio.ByteBuff; 079import org.apache.hadoop.hbase.nio.RefCnt; 080import org.apache.hadoop.hbase.protobuf.ProtobufMagic; 081import org.apache.hadoop.hbase.regionserver.HRegion; 082import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 083import org.apache.hadoop.hbase.util.Bytes; 084import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 085import org.apache.hadoop.hbase.util.IdReadWriteLock; 086import org.apache.hadoop.hbase.util.IdReadWriteLockStrongRef; 087import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool; 088import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool.ReferenceType; 089import org.apache.hadoop.hbase.util.Pair; 090import org.apache.hadoop.util.StringUtils; 091import org.apache.yetus.audience.InterfaceAudience; 092import org.slf4j.Logger; 093import org.slf4j.LoggerFactory; 094 095import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 096import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 097 098import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos; 099 100/** 101 * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses BucketCache#ramCache 102 * and BucketCache#backingMap in order to determine if a given element is in the cache. The bucket 103 * cache can use off-heap memory {@link ByteBufferIOEngine} or mmap 104 * {@link ExclusiveMemoryMmapIOEngine} or pmem {@link SharedMemoryMmapIOEngine} or local files 105 * {@link FileIOEngine} to store/read the block data. 106 * <p> 107 * Eviction is via a similar algorithm as used in 108 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache} 109 * <p> 110 * BucketCache can be used as mainly a block cache (see 111 * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with a BlockCache to 112 * decrease CMS GC and heap fragmentation. 113 * <p> 114 * It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store blocks) to 115 * enlarge cache space via a victim cache. 116 */ 117@InterfaceAudience.Private 118public class BucketCache implements BlockCache, HeapSize { 119 private static final Logger LOG = LoggerFactory.getLogger(BucketCache.class); 120 121 /** Priority buckets config */ 122 static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor"; 123 static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor"; 124 static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor"; 125 static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor"; 126 static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor"; 127 static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor"; 128 static final String BACKING_MAP_PERSISTENCE_CHUNK_SIZE = 129 "hbase.bucketcache.persistence.chunksize"; 130 131 /** Use strong reference for offsetLock or not */ 132 private static final String STRONG_REF_KEY = "hbase.bucketcache.offsetlock.usestrongref"; 133 private static final boolean STRONG_REF_DEFAULT = false; 134 135 /** The cache age of blocks to check if the related file is present on any online regions. */ 136 static final String BLOCK_ORPHAN_GRACE_PERIOD = 137 "hbase.bucketcache.block.orphan.evictgraceperiod.seconds"; 138 139 static final long BLOCK_ORPHAN_GRACE_PERIOD_DEFAULT = 24 * 60 * 60 * 1000L; 140 141 /** Priority buckets */ 142 static final float DEFAULT_SINGLE_FACTOR = 0.25f; 143 static final float DEFAULT_MULTI_FACTOR = 0.50f; 144 static final float DEFAULT_MEMORY_FACTOR = 0.25f; 145 static final float DEFAULT_MIN_FACTOR = 0.85f; 146 147 private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f; 148 private static final float DEFAULT_ACCEPT_FACTOR = 0.95f; 149 150 // Number of blocks to clear for each of the bucket size that is full 151 private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2; 152 153 /** Statistics thread */ 154 private static final int statThreadPeriod = 5 * 60; 155 156 final static int DEFAULT_WRITER_THREADS = 3; 157 final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; 158 159 final static long DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE = 10000; 160 161 // Store/read block data 162 transient final IOEngine ioEngine; 163 164 // Store the block in this map before writing it to cache 165 transient final RAMCache ramCache; 166 167 // In this map, store the block's meta data like offset, length 168 transient Map<BlockCacheKey, BucketEntry> backingMap; 169 170 private AtomicBoolean backingMapValidated = new AtomicBoolean(false); 171 172 /** 173 * Map of hFile -> Region -> File size. This map is used to track all files completed prefetch, 174 * together with the region those belong to and the total cached size for the 175 * region.TestBlockEvictionOnRegionMovement 176 */ 177 final Map<String, Pair<String, Long>> fullyCachedFiles = new ConcurrentHashMap<>(); 178 /** 179 * Map of region -> total size of the region prefetched on this region server. This is the total 180 * size of hFiles for this region prefetched on this region server 181 */ 182 final Map<String, Long> regionCachedSize = new ConcurrentHashMap<>(); 183 184 private BucketCachePersister cachePersister; 185 186 /** 187 * Enum to represent the state of cache 188 */ 189 protected enum CacheState { 190 // Initializing: State when the cache is being initialised from persistence. 191 INITIALIZING, 192 // Enabled: State when cache is initialised and is ready. 193 ENABLED, 194 // Disabled: State when the cache is disabled. 195 DISABLED 196 } 197 198 /** 199 * Flag if the cache is enabled or not... We shut it off if there are IO errors for some time, so 200 * that Bucket IO exceptions/errors don't bring down the HBase server. 201 */ 202 private volatile CacheState cacheState; 203 204 /** 205 * A list of writer queues. We have a queue per {@link WriterThread} we have running. In other 206 * words, the work adding blocks to the BucketCache is divided up amongst the running 207 * WriterThreads. Its done by taking hash of the cache key modulo queue count. WriterThread when 208 * it runs takes whatever has been recently added and 'drains' the entries to the BucketCache. It 209 * then updates the ramCache and backingMap accordingly. 210 */ 211 transient final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList<>(); 212 transient final WriterThread[] writerThreads; 213 214 /** Volatile boolean to track if free space is in process or not */ 215 private volatile boolean freeInProgress = false; 216 private transient final Lock freeSpaceLock = new ReentrantLock(); 217 218 private final LongAdder realCacheSize = new LongAdder(); 219 private final LongAdder heapSize = new LongAdder(); 220 /** Current number of cached elements */ 221 private final LongAdder blockNumber = new LongAdder(); 222 223 /** Cache access count (sequential ID) */ 224 private final AtomicLong accessCount = new AtomicLong(); 225 226 private static final int DEFAULT_CACHE_WAIT_TIME = 50; 227 228 private final BucketCacheStats cacheStats = new BucketCacheStats(); 229 private final String persistencePath; 230 static AtomicBoolean isCacheInconsistent = new AtomicBoolean(false); 231 private final long cacheCapacity; 232 /** Approximate block size */ 233 private final long blockSize; 234 235 /** Duration of IO errors tolerated before we disable cache, 1 min as default */ 236 private final int ioErrorsTolerationDuration; 237 // 1 min 238 public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000; 239 240 // Start time of first IO error when reading or writing IO Engine, it will be 241 // reset after a successful read/write. 242 private volatile long ioErrorStartTime = -1; 243 244 private Configuration conf; 245 246 /** 247 * A ReentrantReadWriteLock to lock on a particular block identified by offset. The purpose of 248 * this is to avoid freeing the block which is being read. 249 * <p> 250 */ 251 transient final IdReadWriteLock<Long> offsetLock; 252 253 NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<>( 254 Comparator.comparing(BlockCacheKey::getHfileName).thenComparingLong(BlockCacheKey::getOffset)); 255 256 /** Statistics thread schedule pool (for heavy debugging, could remove) */ 257 private transient final ScheduledExecutorService scheduleThreadPool = 258 Executors.newScheduledThreadPool(1, 259 new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build()); 260 261 // Allocate or free space for the block 262 private transient BucketAllocator bucketAllocator; 263 264 /** Acceptable size of cache (no evictions if size < acceptable) */ 265 private float acceptableFactor; 266 267 /** Minimum threshold of cache (when evicting, evict until size < min) */ 268 private float minFactor; 269 270 /** 271 * Free this floating point factor of extra blocks when evicting. For example free the number of 272 * blocks requested * (1 + extraFreeFactor) 273 */ 274 private float extraFreeFactor; 275 276 /** Single access bucket size */ 277 private float singleFactor; 278 279 /** Multiple access bucket size */ 280 private float multiFactor; 281 282 /** In-memory bucket size */ 283 private float memoryFactor; 284 285 private long bucketcachePersistInterval; 286 287 private static final String FILE_VERIFY_ALGORITHM = 288 "hbase.bucketcache.persistent.file.integrity.check.algorithm"; 289 private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5"; 290 291 private static final String QUEUE_ADDITION_WAIT_TIME = 292 "hbase.bucketcache.queue.addition.waittime"; 293 private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0; 294 private long queueAdditionWaitTime; 295 /** 296 * Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file 297 * integrity, default algorithm is MD5 298 */ 299 private String algorithm; 300 301 private long persistenceChunkSize; 302 303 /* Tracing failed Bucket Cache allocations. */ 304 private long allocFailLogPrevTs; // time of previous log event for allocation failure. 305 private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000; // Default 1 minute. 306 307 private Map<String, HRegion> onlineRegions; 308 309 private long orphanBlockGracePeriod = 0; 310 311 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 312 int writerThreadNum, int writerQLen, String persistencePath) throws IOException { 313 this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 314 persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create()); 315 } 316 317 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 318 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, 319 Configuration conf) throws IOException { 320 this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 321 persistencePath, ioErrorsTolerationDuration, conf, null); 322 } 323 324 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 325 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, 326 Configuration conf, Map<String, HRegion> onlineRegions) throws IOException { 327 Preconditions.checkArgument(blockSize > 0, 328 "BucketCache capacity is set to " + blockSize + ", can not be less than 0"); 329 boolean useStrongRef = conf.getBoolean(STRONG_REF_KEY, STRONG_REF_DEFAULT); 330 if (useStrongRef) { 331 this.offsetLock = new IdReadWriteLockStrongRef<>(); 332 } else { 333 this.offsetLock = new IdReadWriteLockWithObjectPool<>(ReferenceType.SOFT); 334 } 335 this.conf = conf; 336 this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM); 337 this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath); 338 this.writerThreads = new WriterThread[writerThreadNum]; 339 this.onlineRegions = onlineRegions; 340 this.orphanBlockGracePeriod = 341 conf.getLong(BLOCK_ORPHAN_GRACE_PERIOD, BLOCK_ORPHAN_GRACE_PERIOD_DEFAULT); 342 long blockNumCapacity = capacity / blockSize; 343 if (blockNumCapacity >= Integer.MAX_VALUE) { 344 // Enough for about 32TB of cache! 345 throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now"); 346 } 347 348 this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR); 349 this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR); 350 this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR); 351 this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR); 352 this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR); 353 this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR); 354 this.queueAdditionWaitTime = 355 conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME); 356 this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000); 357 this.persistenceChunkSize = 358 conf.getLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE); 359 if (this.persistenceChunkSize <= 0) { 360 persistenceChunkSize = DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE; 361 } 362 363 sanityCheckConfigs(); 364 365 LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor 366 + ", minFactor: " + minFactor + ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: " 367 + singleFactor + ", multiFactor: " + multiFactor + ", memoryFactor: " + memoryFactor 368 + ", useStrongRef: " + useStrongRef); 369 370 this.cacheCapacity = capacity; 371 this.persistencePath = persistencePath; 372 this.blockSize = blockSize; 373 this.ioErrorsTolerationDuration = ioErrorsTolerationDuration; 374 this.cacheState = CacheState.INITIALIZING; 375 376 this.allocFailLogPrevTs = 0; 377 378 for (int i = 0; i < writerThreads.length; ++i) { 379 writerQueues.add(new ArrayBlockingQueue<>(writerQLen)); 380 } 381 382 assert writerQueues.size() == writerThreads.length; 383 this.ramCache = new RAMCache(); 384 385 this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity); 386 instantiateWriterThreads(); 387 388 if (isCachePersistent()) { 389 if (ioEngine instanceof FileIOEngine) { 390 startBucketCachePersisterThread(); 391 } 392 startPersistenceRetriever(bucketSizes, capacity); 393 } else { 394 bucketAllocator = new BucketAllocator(capacity, bucketSizes); 395 this.cacheState = CacheState.ENABLED; 396 startWriterThreads(); 397 } 398 399 // Run the statistics thread periodically to print the cache statistics log 400 // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log 401 // every five minutes. 402 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod, 403 statThreadPeriod, TimeUnit.SECONDS); 404 LOG.info("Started bucket cache; ioengine=" + ioEngineName + ", capacity=" 405 + StringUtils.byteDesc(capacity) + ", blockSize=" + StringUtils.byteDesc(blockSize) 406 + ", writerThreadNum=" + writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" 407 + persistencePath + ", bucketAllocator=" + BucketAllocator.class.getName()); 408 } 409 410 private void startPersistenceRetriever(int[] bucketSizes, long capacity) { 411 Runnable persistentCacheRetriever = () -> { 412 try { 413 retrieveFromFile(bucketSizes); 414 LOG.info("Persistent bucket cache recovery from {} is complete.", persistencePath); 415 } catch (Throwable ex) { 416 LOG.warn("Can't restore from file[{}]. The bucket cache will be reset and rebuilt." 417 + " Exception seen: ", persistencePath, ex); 418 backingMap.clear(); 419 fullyCachedFiles.clear(); 420 backingMapValidated.set(true); 421 regionCachedSize.clear(); 422 try { 423 bucketAllocator = new BucketAllocator(capacity, bucketSizes); 424 } catch (BucketAllocatorException allocatorException) { 425 LOG.error("Exception during Bucket Allocation", allocatorException); 426 } 427 } finally { 428 this.cacheState = CacheState.ENABLED; 429 startWriterThreads(); 430 } 431 }; 432 Thread t = new Thread(persistentCacheRetriever); 433 t.start(); 434 } 435 436 private void sanityCheckConfigs() { 437 Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0, 438 ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 439 Preconditions.checkArgument(minFactor <= 1 && minFactor >= 0, 440 MIN_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 441 Preconditions.checkArgument(minFactor <= acceptableFactor, 442 MIN_FACTOR_CONFIG_NAME + " must be <= " + ACCEPT_FACTOR_CONFIG_NAME); 443 Preconditions.checkArgument(extraFreeFactor >= 0, 444 EXTRA_FREE_FACTOR_CONFIG_NAME + " must be greater than 0.0"); 445 Preconditions.checkArgument(singleFactor <= 1 && singleFactor >= 0, 446 SINGLE_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 447 Preconditions.checkArgument(multiFactor <= 1 && multiFactor >= 0, 448 MULTI_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 449 Preconditions.checkArgument(memoryFactor <= 1 && memoryFactor >= 0, 450 MEMORY_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 451 Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1, 452 SINGLE_FACTOR_CONFIG_NAME + ", " + MULTI_FACTOR_CONFIG_NAME + ", and " 453 + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0"); 454 } 455 456 /** 457 * Called by the constructor to instantiate the writer threads. 458 */ 459 private void instantiateWriterThreads() { 460 final String threadName = Thread.currentThread().getName(); 461 for (int i = 0; i < this.writerThreads.length; ++i) { 462 this.writerThreads[i] = new WriterThread(this.writerQueues.get(i)); 463 this.writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); 464 this.writerThreads[i].setDaemon(true); 465 } 466 } 467 468 /** 469 * Called by the constructor to start the writer threads. Used by tests that need to override 470 * starting the threads. 471 */ 472 protected void startWriterThreads() { 473 for (WriterThread thread : writerThreads) { 474 thread.start(); 475 } 476 } 477 478 void startBucketCachePersisterThread() { 479 LOG.info("Starting BucketCachePersisterThread"); 480 cachePersister = new BucketCachePersister(this, bucketcachePersistInterval); 481 cachePersister.setDaemon(true); 482 cachePersister.start(); 483 } 484 485 @Override 486 public boolean isCacheEnabled() { 487 return this.cacheState == CacheState.ENABLED; 488 } 489 490 @Override 491 public long getMaxSize() { 492 return this.cacheCapacity; 493 } 494 495 public String getIoEngine() { 496 return ioEngine.toString(); 497 } 498 499 /** 500 * Get the IOEngine from the IO engine name 501 * @return the IOEngine 502 */ 503 private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath) 504 throws IOException { 505 if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) { 506 // In order to make the usage simple, we only need the prefix 'files:' in 507 // document whether one or multiple file(s), but also support 'file:' for 508 // the compatibility 509 String[] filePaths = 510 ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER); 511 return new FileIOEngine(capacity, persistencePath != null, filePaths); 512 } else if (ioEngineName.startsWith("offheap")) { 513 return new ByteBufferIOEngine(capacity); 514 } else if (ioEngineName.startsWith("mmap:")) { 515 return new ExclusiveMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 516 } else if (ioEngineName.startsWith("pmem:")) { 517 // This mode of bucket cache creates an IOEngine over a file on the persistent memory 518 // device. Since the persistent memory device has its own address space the contents 519 // mapped to this address space does not get swapped out like in the case of mmapping 520 // on to DRAM. Hence the cells created out of the hfile blocks in the pmem bucket cache 521 // can be directly referred to without having to copy them onheap. Once the RPC is done, 522 // the blocks can be returned back as in case of ByteBufferIOEngine. 523 return new SharedMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 524 } else { 525 throw new IllegalArgumentException( 526 "Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap"); 527 } 528 } 529 530 public boolean isCachePersistenceEnabled() { 531 return persistencePath != null; 532 } 533 534 /** 535 * Cache the block with the specified name and buffer. 536 * @param cacheKey block's cache key 537 * @param buf block buffer 538 */ 539 @Override 540 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 541 cacheBlock(cacheKey, buf, false); 542 } 543 544 /** 545 * Cache the block with the specified name and buffer. 546 * @param cacheKey block's cache key 547 * @param cachedItem block buffer 548 * @param inMemory if block is in-memory 549 */ 550 @Override 551 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) { 552 cacheBlockWithWait(cacheKey, cachedItem, inMemory, false); 553 } 554 555 /** 556 * Cache the block with the specified name and buffer. 557 * @param cacheKey block's cache key 558 * @param cachedItem block buffer 559 * @param inMemory if block is in-memory 560 */ 561 @Override 562 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, 563 boolean waitWhenCache) { 564 cacheBlockWithWait(cacheKey, cachedItem, inMemory, waitWhenCache && queueAdditionWaitTime > 0); 565 } 566 567 /** 568 * Cache the block to ramCache 569 * @param cacheKey block's cache key 570 * @param cachedItem block buffer 571 * @param inMemory if block is in-memory 572 * @param wait if true, blocking wait when queue is full 573 */ 574 public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, 575 boolean wait) { 576 if (isCacheEnabled()) { 577 if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) { 578 if (shouldReplaceExistingCacheBlock(cacheKey, cachedItem)) { 579 BucketEntry bucketEntry = backingMap.get(cacheKey); 580 if (bucketEntry != null && bucketEntry.isRpcRef()) { 581 // avoid replace when there are RPC refs for the bucket entry in bucket cache 582 return; 583 } 584 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 585 } 586 } else { 587 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 588 } 589 } 590 } 591 592 protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) { 593 return BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, newBlock); 594 } 595 596 protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, 597 boolean inMemory, boolean wait) { 598 if (!isCacheEnabled()) { 599 return; 600 } 601 if (cacheKey.getBlockType() == null && cachedItem.getBlockType() != null) { 602 cacheKey.setBlockType(cachedItem.getBlockType()); 603 } 604 LOG.trace("Caching key={}, item={}", cacheKey, cachedItem); 605 // Stuff the entry into the RAM cache so it can get drained to the persistent store 606 RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), 607 inMemory, isCachePersistent() && ioEngine instanceof FileIOEngine); 608 /** 609 * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same 610 * key in ramCache, the heap size of bucket cache need to update if replacing entry from 611 * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if 612 * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct 613 * one, then the heap size will mess up (HBASE-20789) 614 */ 615 if (ramCache.putIfAbsent(cacheKey, re) != null) { 616 return; 617 } 618 int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); 619 BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum); 620 boolean successfulAddition = false; 621 if (wait) { 622 try { 623 successfulAddition = bq.offer(re, queueAdditionWaitTime, TimeUnit.MILLISECONDS); 624 } catch (InterruptedException e) { 625 Thread.currentThread().interrupt(); 626 } 627 } else { 628 successfulAddition = bq.offer(re); 629 } 630 if (!successfulAddition) { 631 LOG.debug("Failed to insert block {} into the cache writers queue", cacheKey); 632 ramCache.remove(cacheKey); 633 cacheStats.failInsert(); 634 } else { 635 this.blockNumber.increment(); 636 this.heapSize.add(cachedItem.heapSize()); 637 } 638 } 639 640 /** 641 * If the passed cache key relates to a reference (<hfile>.<parentEncRegion>), this 642 * method looks for the block from the referred file, in the cache. If present in the cache, the 643 * block for the referred file is returned, otherwise, this method returns null. It will also 644 * return null if the passed cache key doesn't relate to a reference. 645 * @param key the BlockCacheKey instance to look for in the cache. 646 * @return the cached block from the referred file, null if there's no such block in the cache or 647 * the passed key doesn't relate to a reference. 648 */ 649 public BucketEntry getBlockForReference(BlockCacheKey key) { 650 BucketEntry foundEntry = null; 651 String referredFileName = null; 652 if (StoreFileInfo.isReference(key.getHfileName())) { 653 referredFileName = StoreFileInfo.getReferredToRegionAndFile(key.getHfileName()).getSecond(); 654 } 655 if (referredFileName != null) { 656 BlockCacheKey convertedCacheKey = new BlockCacheKey(referredFileName, key.getOffset()); 657 foundEntry = backingMap.get(convertedCacheKey); 658 LOG.debug("Got a link/ref: {}. Related cacheKey: {}. Found entry: {}", key.getHfileName(), 659 convertedCacheKey, foundEntry); 660 } 661 return foundEntry; 662 } 663 664 /** 665 * Get the buffer of the block with the specified key. 666 * @param key block's cache key 667 * @param caching true if the caller caches blocks on cache misses 668 * @param repeat Whether this is a repeat lookup for the same block 669 * @param updateCacheMetrics Whether we should update cache metrics or not 670 * @return buffer of specified cache key, or null if not in cache 671 */ 672 @Override 673 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, 674 boolean updateCacheMetrics) { 675 if (!isCacheEnabled()) { 676 cacheStats.miss(caching, key.isPrimary(), key.getBlockType()); 677 return null; 678 } 679 RAMQueueEntry re = ramCache.get(key); 680 if (re != null) { 681 if (updateCacheMetrics) { 682 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 683 } 684 re.access(accessCount.incrementAndGet()); 685 return re.getData(); 686 } 687 BucketEntry bucketEntry = backingMap.get(key); 688 if (bucketEntry == null) { 689 bucketEntry = getBlockForReference(key); 690 } 691 if (bucketEntry != null) { 692 long start = System.nanoTime(); 693 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); 694 try { 695 lock.readLock().lock(); 696 // We can not read here even if backingMap does contain the given key because its offset 697 // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check 698 // existence here. 699 if ( 700 bucketEntry.equals(backingMap.get(key)) || bucketEntry.equals(getBlockForReference(key)) 701 ) { 702 // Read the block from IOEngine based on the bucketEntry's offset and length, NOTICE: the 703 // block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to 704 // the same BucketEntry, then all of the three will share the same refCnt. 705 Cacheable cachedBlock = ioEngine.read(bucketEntry); 706 if (ioEngine.usesSharedMemory()) { 707 // If IOEngine use shared memory, cachedBlock and BucketEntry will share the 708 // same RefCnt, do retain here, in order to count the number of RPC references 709 cachedBlock.retain(); 710 } 711 // Update the cache statistics. 712 if (updateCacheMetrics) { 713 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 714 cacheStats.ioHit(System.nanoTime() - start); 715 } 716 bucketEntry.access(accessCount.incrementAndGet()); 717 if (this.ioErrorStartTime > 0) { 718 ioErrorStartTime = -1; 719 } 720 return cachedBlock; 721 } 722 } catch (HBaseIOException hioex) { 723 // When using file io engine persistent cache, 724 // the cache map state might differ from the actual cache. If we reach this block, 725 // we should remove the cache key entry from the backing map 726 backingMap.remove(key); 727 fullyCachedFiles.remove(key.getHfileName()); 728 LOG.debug("Failed to fetch block for cache key: {}.", key, hioex); 729 } catch (IOException ioex) { 730 LOG.error("Failed reading block " + key + " from bucket cache", ioex); 731 checkIOErrorIsTolerated(); 732 } finally { 733 lock.readLock().unlock(); 734 } 735 } 736 if (!repeat && updateCacheMetrics) { 737 cacheStats.miss(caching, key.isPrimary(), key.getBlockType()); 738 } 739 return null; 740 } 741 742 /** 743 * This method is invoked after the bucketEntry is removed from {@link BucketCache#backingMap} 744 */ 745 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber, 746 boolean evictedByEvictionProcess) { 747 bucketEntry.markAsEvicted(); 748 blocksByHFile.remove(cacheKey); 749 if (decrementBlockNumber) { 750 this.blockNumber.decrement(); 751 if (ioEngine.isPersistent()) { 752 fileNotFullyCached(cacheKey.getHfileName()); 753 } 754 } 755 if (evictedByEvictionProcess) { 756 cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); 757 } 758 if (ioEngine.isPersistent()) { 759 setCacheInconsistent(true); 760 } 761 } 762 763 private void fileNotFullyCached(String hfileName) { 764 // Update the regionPrefetchedSizeMap before removing the file from prefetchCompleted 765 if (fullyCachedFiles.containsKey(hfileName)) { 766 Pair<String, Long> regionEntry = fullyCachedFiles.get(hfileName); 767 String regionEncodedName = regionEntry.getFirst(); 768 long filePrefetchSize = regionEntry.getSecond(); 769 LOG.debug("Removing file {} for region {}", hfileName, regionEncodedName); 770 regionCachedSize.computeIfPresent(regionEncodedName, (rn, pf) -> pf - filePrefetchSize); 771 // If all the blocks for a region are evicted from the cache, remove the entry for that region 772 if ( 773 regionCachedSize.containsKey(regionEncodedName) 774 && regionCachedSize.get(regionEncodedName) == 0 775 ) { 776 regionCachedSize.remove(regionEncodedName); 777 } 778 } 779 fullyCachedFiles.remove(hfileName); 780 } 781 782 public void fileCacheCompleted(Path filePath, long size) { 783 Pair<String, Long> pair = new Pair<>(); 784 // sets the region name 785 String regionName = filePath.getParent().getParent().getName(); 786 pair.setFirst(regionName); 787 pair.setSecond(size); 788 fullyCachedFiles.put(filePath.getName(), pair); 789 } 790 791 private void updateRegionCachedSize(Path filePath, long cachedSize) { 792 if (filePath != null) { 793 String regionName = filePath.getParent().getParent().getName(); 794 regionCachedSize.merge(regionName, cachedSize, 795 (previousSize, newBlockSize) -> previousSize + newBlockSize); 796 } 797 } 798 799 /** 800 * Free the {{@link BucketEntry} actually,which could only be invoked when the 801 * {@link BucketEntry#refCnt} becoming 0. 802 */ 803 void freeBucketEntry(BucketEntry bucketEntry) { 804 bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength()); 805 realCacheSize.add(-1 * bucketEntry.getLength()); 806 } 807 808 /** 809 * Try to evict the block from {@link BlockCache} by force. We'll call this in few cases:<br> 810 * 1. Close an HFile, and clear all cached blocks. <br> 811 * 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for a given table.<br> 812 * <p> 813 * Firstly, we'll try to remove the block from RAMCache,and then try to evict from backingMap. 814 * Here we evict the block from backingMap immediately, but only free the reference from bucket 815 * cache by calling {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this 816 * block, block can only be de-allocated when all of them release the block. 817 * <p> 818 * NOTICE: we need to grab the write offset lock firstly before releasing the reference from 819 * bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt = 0 when 820 * {@link BucketCache#getBlock(BlockCacheKey, boolean, boolean, boolean)}, it's a memory leak. 821 * @param cacheKey Block to evict 822 * @return true to indicate whether we've evicted successfully or not. 823 */ 824 @Override 825 public boolean evictBlock(BlockCacheKey cacheKey) { 826 return doEvictBlock(cacheKey, null, false); 827 } 828 829 /** 830 * Evict the {@link BlockCacheKey} and {@link BucketEntry} from {@link BucketCache#backingMap} and 831 * {@link BucketCache#ramCache}. <br/> 832 * NOTE:When Evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and 833 * {@link BucketEntry} could be removed. 834 * @param cacheKey {@link BlockCacheKey} to evict. 835 * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict. 836 * @return true to indicate whether we've evicted successfully or not. 837 */ 838 private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry, 839 boolean evictedByEvictionProcess) { 840 if (!isCacheEnabled()) { 841 return false; 842 } 843 boolean existedInRamCache = removeFromRamCache(cacheKey); 844 if (bucketEntry == null) { 845 bucketEntry = backingMap.get(cacheKey); 846 } 847 final BucketEntry bucketEntryToUse = bucketEntry; 848 849 if (bucketEntryToUse == null) { 850 if (existedInRamCache && evictedByEvictionProcess) { 851 cacheStats.evicted(0, cacheKey.isPrimary()); 852 } 853 return existedInRamCache; 854 } else { 855 return bucketEntryToUse.withWriteLock(offsetLock, () -> { 856 if (backingMap.remove(cacheKey, bucketEntryToUse)) { 857 LOG.debug("removed key {} from back map with offset lock {} in the evict process", 858 cacheKey, bucketEntryToUse.offset()); 859 blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache, evictedByEvictionProcess); 860 return true; 861 } 862 return false; 863 }); 864 } 865 } 866 867 /** 868 * <pre> 869 * Create the {@link Recycler} for {@link BucketEntry#refCnt},which would be used as 870 * {@link RefCnt#recycler} of {@link HFileBlock#buf} returned from {@link BucketCache#getBlock}. 871 * NOTE: for {@link BucketCache#getBlock},the {@link RefCnt#recycler} of {@link HFileBlock#buf} 872 * from {@link BucketCache#backingMap} and {@link BucketCache#ramCache} are different: 873 * 1.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#backingMap}, 874 * it is the return value of current {@link BucketCache#createRecycler} method. 875 * 876 * 2.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#ramCache}, 877 * it is {@link ByteBuffAllocator#putbackBuffer}. 878 * </pre> 879 */ 880 public Recycler createRecycler(final BucketEntry bucketEntry) { 881 return () -> { 882 freeBucketEntry(bucketEntry); 883 return; 884 }; 885 } 886 887 /** 888 * NOTE: This method is only for test. 889 */ 890 public boolean evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey) { 891 BucketEntry bucketEntry = backingMap.get(blockCacheKey); 892 if (bucketEntry == null) { 893 return false; 894 } 895 return evictBucketEntryIfNoRpcReferenced(blockCacheKey, bucketEntry); 896 } 897 898 /** 899 * Evict {@link BlockCacheKey} and its corresponding {@link BucketEntry} only if 900 * {@link BucketEntry#isRpcRef} is false. <br/> 901 * NOTE:When evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and 902 * {@link BucketEntry} could be removed. 903 * @param blockCacheKey {@link BlockCacheKey} to evict. 904 * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict. 905 * @return true to indicate whether we've evicted successfully or not. 906 */ 907 boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, BucketEntry bucketEntry) { 908 if (!bucketEntry.isRpcRef()) { 909 return doEvictBlock(blockCacheKey, bucketEntry, true); 910 } 911 return false; 912 } 913 914 protected boolean removeFromRamCache(BlockCacheKey cacheKey) { 915 return ramCache.remove(cacheKey, re -> { 916 if (re != null) { 917 this.blockNumber.decrement(); 918 this.heapSize.add(-1 * re.getData().heapSize()); 919 } 920 }); 921 } 922 923 public boolean isCacheInconsistent() { 924 return isCacheInconsistent.get(); 925 } 926 927 public void setCacheInconsistent(boolean setCacheInconsistent) { 928 isCacheInconsistent.set(setCacheInconsistent); 929 } 930 931 protected void setCacheState(CacheState state) { 932 cacheState = state; 933 } 934 935 /* 936 * Statistics thread. Periodically output cache statistics to the log. 937 */ 938 private static class StatisticsThread extends Thread { 939 private final BucketCache bucketCache; 940 941 public StatisticsThread(BucketCache bucketCache) { 942 super("BucketCacheStatsThread"); 943 setDaemon(true); 944 this.bucketCache = bucketCache; 945 } 946 947 @Override 948 public void run() { 949 bucketCache.logStats(); 950 } 951 } 952 953 public void logStats() { 954 if (!isCacheInitialized("BucketCache::logStats")) { 955 return; 956 } 957 958 long totalSize = bucketAllocator.getTotalSize(); 959 long usedSize = bucketAllocator.getUsedSize(); 960 long freeSize = totalSize - usedSize; 961 long cacheSize = getRealCacheSize(); 962 LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " + "totalSize=" 963 + StringUtils.byteDesc(totalSize) + ", " + "freeSize=" + StringUtils.byteDesc(freeSize) + ", " 964 + "usedSize=" + StringUtils.byteDesc(usedSize) + ", " + "cacheSize=" 965 + StringUtils.byteDesc(cacheSize) + ", " + "accesses=" + cacheStats.getRequestCount() + ", " 966 + "hits=" + cacheStats.getHitCount() + ", " + "IOhitsPerSecond=" 967 + cacheStats.getIOHitsPerSecond() + ", " + "IOTimePerHit=" 968 + String.format("%.2f", cacheStats.getIOTimePerHit()) + ", " + "hitRatio=" 969 + (cacheStats.getHitCount() == 0 970 ? "0," 971 : (StringUtils.formatPercent(cacheStats.getHitRatio(), 2) + ", ")) 972 + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits=" 973 + cacheStats.getHitCachingCount() + ", " + "cachingHitsRatio=" 974 + (cacheStats.getHitCachingCount() == 0 975 ? "0," 976 : (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2) + ", ")) 977 + "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted=" 978 + cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + cacheStats.evictedPerEviction() 979 + ", " + "allocationFailCount=" + cacheStats.getAllocationFailCount() + ", blocksCount=" 980 + backingMap.size()); 981 cacheStats.reset(); 982 983 bucketAllocator.logDebugStatistics(); 984 } 985 986 public long getRealCacheSize() { 987 return this.realCacheSize.sum(); 988 } 989 990 public long acceptableSize() { 991 if (!isCacheInitialized("BucketCache::acceptableSize")) { 992 return 0; 993 } 994 return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor); 995 } 996 997 long getPartitionSize(float partitionFactor) { 998 if (!isCacheInitialized("BucketCache::getPartitionSize")) { 999 return 0; 1000 } 1001 1002 return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor); 1003 } 1004 1005 /** 1006 * Return the count of bucketSizeinfos still need free space 1007 */ 1008 private int bucketSizesAboveThresholdCount(float minFactor) { 1009 if (!isCacheInitialized("BucketCache::bucketSizesAboveThresholdCount")) { 1010 return 0; 1011 } 1012 1013 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 1014 int fullCount = 0; 1015 for (int i = 0; i < stats.length; i++) { 1016 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 1017 freeGoal = Math.max(freeGoal, 1); 1018 if (stats[i].freeCount() < freeGoal) { 1019 fullCount++; 1020 } 1021 } 1022 return fullCount; 1023 } 1024 1025 /** 1026 * This method will find the buckets that are minimally occupied and are not reference counted and 1027 * will free them completely without any constraint on the access times of the elements, and as a 1028 * process will completely free at most the number of buckets passed, sometimes it might not due 1029 * to changing refCounts 1030 * @param completelyFreeBucketsNeeded number of buckets to free 1031 **/ 1032 private void freeEntireBuckets(int completelyFreeBucketsNeeded) { 1033 if (!isCacheInitialized("BucketCache::freeEntireBuckets")) { 1034 return; 1035 } 1036 1037 if (completelyFreeBucketsNeeded != 0) { 1038 // First we will build a set where the offsets are reference counted, usually 1039 // this set is small around O(Handler Count) unless something else is wrong 1040 Set<Integer> inUseBuckets = new HashSet<>(); 1041 backingMap.forEach((k, be) -> { 1042 if (be.isRpcRef()) { 1043 inUseBuckets.add(bucketAllocator.getBucketIndex(be.offset())); 1044 } 1045 }); 1046 Set<Integer> candidateBuckets = 1047 bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded); 1048 for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) { 1049 if (candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset()))) { 1050 evictBucketEntryIfNoRpcReferenced(entry.getKey(), entry.getValue()); 1051 } 1052 } 1053 } 1054 } 1055 1056 private long calculateBytesToFree(StringBuilder msgBuffer) { 1057 long bytesToFreeWithoutExtra = 0; 1058 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 1059 long[] bytesToFreeForBucket = new long[stats.length]; 1060 for (int i = 0; i < stats.length; i++) { 1061 bytesToFreeForBucket[i] = 0; 1062 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 1063 freeGoal = Math.max(freeGoal, 1); 1064 if (stats[i].freeCount() < freeGoal) { 1065 bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount()); 1066 bytesToFreeWithoutExtra += bytesToFreeForBucket[i]; 1067 if (msgBuffer != null) { 1068 msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" 1069 + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", "); 1070 } 1071 } 1072 } 1073 if (msgBuffer != null) { 1074 msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); 1075 } 1076 return bytesToFreeWithoutExtra; 1077 } 1078 1079 /** 1080 * Free the space if the used size reaches acceptableSize() or one size block couldn't be 1081 * allocated. When freeing the space, we use the LRU algorithm and ensure there must be some 1082 * blocks evicted 1083 * @param why Why we are being called 1084 */ 1085 void freeSpace(final String why) { 1086 if (!isCacheInitialized("BucketCache::freeSpace")) { 1087 return; 1088 } 1089 // Ensure only one freeSpace progress at a time 1090 if (!freeSpaceLock.tryLock()) { 1091 return; 1092 } 1093 try { 1094 freeInProgress = true; 1095 StringBuilder msgBuffer = LOG.isDebugEnabled() ? new StringBuilder() : null; 1096 long bytesToFreeWithoutExtra = calculateBytesToFree(msgBuffer); 1097 if (bytesToFreeWithoutExtra <= 0) { 1098 return; 1099 } 1100 long currentSize = bucketAllocator.getUsedSize(); 1101 long totalSize = bucketAllocator.getTotalSize(); 1102 if (LOG.isDebugEnabled() && msgBuffer != null) { 1103 LOG.debug("Free started because \"" + why + "\"; " + msgBuffer + " of current used=" 1104 + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" 1105 + StringUtils.byteDesc(realCacheSize.sum()) + ", total=" 1106 + StringUtils.byteDesc(totalSize)); 1107 } 1108 long bytesToFreeWithExtra = 1109 (long) Math.floor(bytesToFreeWithoutExtra * (1 + extraFreeFactor)); 1110 // Instantiate priority buckets 1111 BucketEntryGroup bucketSingle = 1112 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(singleFactor)); 1113 BucketEntryGroup bucketMulti = 1114 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(multiFactor)); 1115 BucketEntryGroup bucketMemory = 1116 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(memoryFactor)); 1117 1118 Set<String> allValidFiles = null; 1119 // We need the region/stores/files tree, in order to figure out if a block is "orphan" or not. 1120 // See further comments below for more details. 1121 if (onlineRegions != null) { 1122 allValidFiles = BlockCacheUtil.listAllFilesNames(onlineRegions); 1123 } 1124 // the cached time is recored in nanos, so we need to convert the grace period accordingly 1125 long orphanGracePeriodNanos = orphanBlockGracePeriod * 1000000; 1126 long bytesFreed = 0; 1127 // Scan entire map putting bucket entry into appropriate bucket entry 1128 // group 1129 for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) { 1130 BlockCacheKey key = bucketEntryWithKey.getKey(); 1131 BucketEntry entry = bucketEntryWithKey.getValue(); 1132 // Under certain conditions, blocks for regions not on the current region server might 1133 // be hanging on the cache. For example, when using the persistent cache feature, if the 1134 // RS crashes, then if not the same regions are assigned back once its online again, blocks 1135 // for the previous online regions would be recovered and stay in the cache. These would be 1136 // "orphan" blocks, as the files these blocks belong to are not in any of the online 1137 // regions. 1138 // "Orphan" blocks are a pure waste of cache space and should be evicted first during 1139 // the freespace run. 1140 // Compactions and Flushes may cache blocks before its files are completely written. In 1141 // these cases the file won't be found in any of the online regions stores, but the block 1142 // shouldn't be evicted. To avoid this, we defined this 1143 // hbase.bucketcache.block.orphan.evictgraceperiod property, to account for a grace 1144 // period (default 24 hours) where a block should be checked if it's an orphan block. 1145 if ( 1146 allValidFiles != null 1147 && entry.getCachedTime() < (System.nanoTime() - orphanGracePeriodNanos) 1148 ) { 1149 if (!allValidFiles.contains(key.getHfileName())) { 1150 if (evictBucketEntryIfNoRpcReferenced(key, entry)) { 1151 // We calculate the freed bytes, but we don't stop if the goal was reached because 1152 // these are orphan blocks anyway, so let's leverage this run of freeSpace 1153 // to get rid of all orphans at once. 1154 bytesFreed += entry.getLength(); 1155 continue; 1156 } 1157 } 1158 } 1159 switch (entry.getPriority()) { 1160 case SINGLE: { 1161 bucketSingle.add(bucketEntryWithKey); 1162 break; 1163 } 1164 case MULTI: { 1165 bucketMulti.add(bucketEntryWithKey); 1166 break; 1167 } 1168 case MEMORY: { 1169 bucketMemory.add(bucketEntryWithKey); 1170 break; 1171 } 1172 } 1173 } 1174 PriorityQueue<BucketEntryGroup> bucketQueue = 1175 new PriorityQueue<>(3, Comparator.comparingLong(BucketEntryGroup::overflow)); 1176 1177 bucketQueue.add(bucketSingle); 1178 bucketQueue.add(bucketMulti); 1179 bucketQueue.add(bucketMemory); 1180 1181 int remainingBuckets = bucketQueue.size(); 1182 1183 BucketEntryGroup bucketGroup; 1184 while ((bucketGroup = bucketQueue.poll()) != null) { 1185 long overflow = bucketGroup.overflow(); 1186 if (overflow > 0) { 1187 long bucketBytesToFree = 1188 Math.min(overflow, (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets); 1189 bytesFreed += bucketGroup.free(bucketBytesToFree); 1190 } 1191 remainingBuckets--; 1192 } 1193 1194 // Check and free if there are buckets that still need freeing of space 1195 if (bucketSizesAboveThresholdCount(minFactor) > 0) { 1196 bucketQueue.clear(); 1197 remainingBuckets = 3; 1198 bucketQueue.add(bucketSingle); 1199 bucketQueue.add(bucketMulti); 1200 bucketQueue.add(bucketMemory); 1201 while ((bucketGroup = bucketQueue.poll()) != null) { 1202 long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets; 1203 bytesFreed += bucketGroup.free(bucketBytesToFree); 1204 remainingBuckets--; 1205 } 1206 } 1207 // Even after the above free we might still need freeing because of the 1208 // De-fragmentation of the buckets (also called Slab Calcification problem), i.e 1209 // there might be some buckets where the occupancy is very sparse and thus are not 1210 // yielding the free for the other bucket sizes, the fix for this to evict some 1211 // of the buckets, we do this by evicting the buckets that are least fulled 1212 freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR * bucketSizesAboveThresholdCount(1.0f)); 1213 1214 if (LOG.isDebugEnabled()) { 1215 long single = bucketSingle.totalSize(); 1216 long multi = bucketMulti.totalSize(); 1217 long memory = bucketMemory.totalSize(); 1218 if (LOG.isDebugEnabled()) { 1219 LOG.debug("Bucket cache free space completed; " + "freed=" 1220 + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + StringUtils.byteDesc(totalSize) 1221 + ", " + "single=" + StringUtils.byteDesc(single) + ", " + "multi=" 1222 + StringUtils.byteDesc(multi) + ", " + "memory=" + StringUtils.byteDesc(memory)); 1223 } 1224 } 1225 } catch (Throwable t) { 1226 LOG.warn("Failed freeing space", t); 1227 } finally { 1228 cacheStats.evict(); 1229 freeInProgress = false; 1230 freeSpaceLock.unlock(); 1231 } 1232 } 1233 1234 // This handles flushing the RAM cache to IOEngine. 1235 class WriterThread extends Thread { 1236 private final BlockingQueue<RAMQueueEntry> inputQueue; 1237 private volatile boolean writerEnabled = true; 1238 private final ByteBuffer metaBuff = ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE); 1239 1240 WriterThread(BlockingQueue<RAMQueueEntry> queue) { 1241 super("BucketCacheWriterThread"); 1242 this.inputQueue = queue; 1243 } 1244 1245 // Used for test 1246 void disableWriter() { 1247 this.writerEnabled = false; 1248 } 1249 1250 @Override 1251 public void run() { 1252 List<RAMQueueEntry> entries = new ArrayList<>(); 1253 try { 1254 while (isCacheEnabled() && writerEnabled) { 1255 try { 1256 try { 1257 // Blocks 1258 entries = getRAMQueueEntries(inputQueue, entries); 1259 } catch (InterruptedException ie) { 1260 if (!isCacheEnabled() || !writerEnabled) { 1261 break; 1262 } 1263 } 1264 doDrain(entries, metaBuff); 1265 } catch (Exception ioe) { 1266 LOG.error("WriterThread encountered error", ioe); 1267 } 1268 } 1269 } catch (Throwable t) { 1270 LOG.warn("Failed doing drain", t); 1271 } 1272 LOG.info(this.getName() + " exiting, cacheEnabled=" + isCacheEnabled()); 1273 } 1274 } 1275 1276 /** 1277 * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing 1278 * cache with a new block for the same cache key. there's a corner case: one thread cache a block 1279 * in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another new block 1280 * with the same cache key do the same thing for the same cache key, so if not evict the previous 1281 * bucket entry, then memory leak happen because the previous bucketEntry is gone but the 1282 * bucketAllocator do not free its memory. 1283 * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey 1284 * cacheKey, Cacheable newBlock) 1285 * @param key Block cache key 1286 * @param bucketEntry Bucket entry to put into backingMap. 1287 */ 1288 protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { 1289 BucketEntry previousEntry = backingMap.put(key, bucketEntry); 1290 blocksByHFile.add(key); 1291 updateRegionCachedSize(key.getFilePath(), bucketEntry.getLength()); 1292 if (previousEntry != null && previousEntry != bucketEntry) { 1293 previousEntry.withWriteLock(offsetLock, () -> { 1294 blockEvicted(key, previousEntry, false, false); 1295 return null; 1296 }); 1297 } 1298 } 1299 1300 /** 1301 * Prepare and return a warning message for Bucket Allocator Exception 1302 * @param fle The exception 1303 * @param re The RAMQueueEntry for which the exception was thrown. 1304 * @return A warning message created from the input RAMQueueEntry object. 1305 */ 1306 private static String getAllocationFailWarningMessage(final BucketAllocatorException fle, 1307 final RAMQueueEntry re) { 1308 final StringBuilder sb = new StringBuilder(); 1309 sb.append("Most recent failed allocation after "); 1310 sb.append(ALLOCATION_FAIL_LOG_TIME_PERIOD); 1311 sb.append(" ms;"); 1312 if (re != null) { 1313 if (re.getData() instanceof HFileBlock) { 1314 final HFileContext fileContext = ((HFileBlock) re.getData()).getHFileContext(); 1315 final String columnFamily = Bytes.toString(fileContext.getColumnFamily()); 1316 final String tableName = Bytes.toString(fileContext.getTableName()); 1317 if (tableName != null) { 1318 sb.append(" Table: "); 1319 sb.append(tableName); 1320 } 1321 if (columnFamily != null) { 1322 sb.append(" CF: "); 1323 sb.append(columnFamily); 1324 } 1325 sb.append(" HFile: "); 1326 if (fileContext.getHFileName() != null) { 1327 sb.append(fileContext.getHFileName()); 1328 } else { 1329 sb.append(re.getKey()); 1330 } 1331 } else { 1332 sb.append(" HFile: "); 1333 sb.append(re.getKey()); 1334 } 1335 } 1336 sb.append(" Message: "); 1337 sb.append(fle.getMessage()); 1338 return sb.toString(); 1339 } 1340 1341 /** 1342 * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. Process all that 1343 * are passed in even if failure being sure to remove from ramCache else we'll never undo the 1344 * references and we'll OOME. 1345 * @param entries Presumes list passed in here will be processed by this invocation only. No 1346 * interference expected. 1347 */ 1348 void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException { 1349 if (entries.isEmpty()) { 1350 return; 1351 } 1352 // This method is a little hard to follow. We run through the passed in entries and for each 1353 // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must 1354 // do cleanup making sure we've cleared ramCache of all entries regardless of whether we 1355 // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by 1356 // filling ramCache. We do the clean up by again running through the passed in entries 1357 // doing extra work when we find a non-null bucketEntries corresponding entry. 1358 final int size = entries.size(); 1359 BucketEntry[] bucketEntries = new BucketEntry[size]; 1360 // Index updated inside loop if success or if we can't succeed. We retry if cache is full 1361 // when we go to add an entry by going around the loop again without upping the index. 1362 int index = 0; 1363 while (isCacheEnabled() && index < size) { 1364 RAMQueueEntry re = null; 1365 try { 1366 re = entries.get(index); 1367 if (re == null) { 1368 LOG.warn("Couldn't get entry or changed on us; who else is messing with it?"); 1369 index++; 1370 continue; 1371 } 1372 // Reset the position for reuse. 1373 // It should be guaranteed that the data in the metaBuff has been transferred to the 1374 // ioEngine safely. Otherwise, this reuse is problematic. Fortunately, the data is already 1375 // transferred with our current IOEngines. Should take care, when we have new kinds of 1376 // IOEngine in the future. 1377 metaBuff.clear(); 1378 BucketEntry bucketEntry = 1379 re.writeToCache(ioEngine, bucketAllocator, realCacheSize, this::createRecycler, metaBuff); 1380 // Successfully added. Up index and add bucketEntry. Clear io exceptions. 1381 bucketEntries[index] = bucketEntry; 1382 if (ioErrorStartTime > 0) { 1383 ioErrorStartTime = -1; 1384 } 1385 index++; 1386 } catch (BucketAllocatorException fle) { 1387 long currTs = EnvironmentEdgeManager.currentTime(); 1388 cacheStats.allocationFailed(); // Record the warning. 1389 if ( 1390 allocFailLogPrevTs == 0 || (currTs - allocFailLogPrevTs) > ALLOCATION_FAIL_LOG_TIME_PERIOD 1391 ) { 1392 LOG.warn(getAllocationFailWarningMessage(fle, re)); 1393 allocFailLogPrevTs = currTs; 1394 } 1395 // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below. 1396 bucketEntries[index] = null; 1397 index++; 1398 } catch (CacheFullException cfe) { 1399 // Cache full when we tried to add. Try freeing space and then retrying (don't up index) 1400 if (!freeInProgress) { 1401 freeSpace("Full!"); 1402 } else { 1403 Thread.sleep(50); 1404 } 1405 } catch (IOException ioex) { 1406 // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem. 1407 LOG.error("Failed writing to bucket cache", ioex); 1408 checkIOErrorIsTolerated(); 1409 } 1410 } 1411 1412 // Make sure data pages are written on media before we update maps. 1413 try { 1414 ioEngine.sync(); 1415 } catch (IOException ioex) { 1416 LOG.error("Failed syncing IO engine", ioex); 1417 checkIOErrorIsTolerated(); 1418 // Since we failed sync, free the blocks in bucket allocator 1419 for (int i = 0; i < entries.size(); ++i) { 1420 BucketEntry bucketEntry = bucketEntries[i]; 1421 if (bucketEntry != null) { 1422 bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength()); 1423 bucketEntries[i] = null; 1424 } 1425 } 1426 } 1427 1428 // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if 1429 // success or error. 1430 for (int i = 0; i < size; ++i) { 1431 BlockCacheKey key = entries.get(i).getKey(); 1432 // Only add if non-null entry. 1433 if (bucketEntries[i] != null) { 1434 putIntoBackingMap(key, bucketEntries[i]); 1435 if (ioEngine.isPersistent()) { 1436 setCacheInconsistent(true); 1437 } 1438 } 1439 // Always remove from ramCache even if we failed adding it to the block cache above. 1440 boolean existed = ramCache.remove(key, re -> { 1441 if (re != null) { 1442 heapSize.add(-1 * re.getData().heapSize()); 1443 } 1444 }); 1445 if (!existed && bucketEntries[i] != null) { 1446 // Block should have already been evicted. Remove it and free space. 1447 final BucketEntry bucketEntry = bucketEntries[i]; 1448 bucketEntry.withWriteLock(offsetLock, () -> { 1449 if (backingMap.remove(key, bucketEntry)) { 1450 blockEvicted(key, bucketEntry, false, false); 1451 } 1452 return null; 1453 }); 1454 } 1455 } 1456 1457 long used = bucketAllocator.getUsedSize(); 1458 if (used > acceptableSize()) { 1459 freeSpace("Used=" + used + " > acceptable=" + acceptableSize()); 1460 } 1461 return; 1462 } 1463 1464 /** 1465 * Blocks until elements available in {@code q} then tries to grab as many as possible before 1466 * returning. 1467 * @param receptacle Where to stash the elements taken from queue. We clear before we use it just 1468 * in case. 1469 * @param q The queue to take from. 1470 * @return {@code receptacle} laden with elements taken from the queue or empty if none found. 1471 */ 1472 static List<RAMQueueEntry> getRAMQueueEntries(BlockingQueue<RAMQueueEntry> q, 1473 List<RAMQueueEntry> receptacle) throws InterruptedException { 1474 // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it 1475 // ok even if list grew to accommodate thousands. 1476 receptacle.clear(); 1477 receptacle.add(q.take()); 1478 q.drainTo(receptacle); 1479 return receptacle; 1480 } 1481 1482 /** 1483 * @see #retrieveFromFile(int[]) 1484 */ 1485 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION", 1486 justification = "false positive, try-with-resources ensures close is called.") 1487 void persistToFile() throws IOException { 1488 LOG.debug("Thread {} started persisting bucket cache to file", 1489 Thread.currentThread().getName()); 1490 if (!isCachePersistent()) { 1491 throw new IOException("Attempt to persist non-persistent cache mappings!"); 1492 } 1493 File tempPersistencePath = new File(persistencePath + EnvironmentEdgeManager.currentTime()); 1494 try (FileOutputStream fos = new FileOutputStream(tempPersistencePath, false)) { 1495 LOG.debug("Persist in new chunked persistence format."); 1496 1497 persistChunkedBackingMap(fos); 1498 1499 LOG.debug( 1500 "PersistToFile: after persisting backing map size: {}, fullycachedFiles size: {}," 1501 + " file name: {}", 1502 backingMap.size(), fullyCachedFiles.size(), tempPersistencePath.getName()); 1503 } catch (IOException e) { 1504 LOG.error("Failed to persist bucket cache to file", e); 1505 throw e; 1506 } catch (Throwable e) { 1507 LOG.error("Failed during persist bucket cache to file: ", e); 1508 throw e; 1509 } 1510 LOG.debug("Thread {} finished persisting bucket cache to file, renaming", 1511 Thread.currentThread().getName()); 1512 if (!tempPersistencePath.renameTo(new File(persistencePath))) { 1513 LOG.warn("Failed to commit cache persistent file. We might lose cached blocks if " 1514 + "RS crashes/restarts before we successfully checkpoint again."); 1515 } 1516 } 1517 1518 public boolean isCachePersistent() { 1519 return ioEngine.isPersistent() && persistencePath != null; 1520 } 1521 1522 @Override 1523 public Optional<Map<String, Long>> getRegionCachedInfo() { 1524 return Optional.of(Collections.unmodifiableMap(regionCachedSize)); 1525 } 1526 1527 /** 1528 * @see #persistToFile() 1529 */ 1530 private void retrieveFromFile(int[] bucketSizes) throws IOException { 1531 LOG.info("Started retrieving bucket cache from file"); 1532 File persistenceFile = new File(persistencePath); 1533 if (!persistenceFile.exists()) { 1534 LOG.warn("Persistence file missing! " 1535 + "It's ok if it's first run after enabling persistent cache."); 1536 bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize); 1537 blockNumber.add(backingMap.size()); 1538 backingMapValidated.set(true); 1539 return; 1540 } 1541 assert !isCacheEnabled(); 1542 1543 try (FileInputStream in = new FileInputStream(persistenceFile)) { 1544 int pblen = ProtobufMagic.lengthOfPBMagic(); 1545 byte[] pbuf = new byte[pblen]; 1546 IOUtils.readFully(in, pbuf, 0, pblen); 1547 if (ProtobufMagic.isPBMagicPrefix(pbuf)) { 1548 LOG.info("Reading old format of persistence."); 1549 // The old non-chunked version of backing map persistence. 1550 parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in)); 1551 } else if (Arrays.equals(pbuf, BucketProtoUtils.PB_MAGIC_V2)) { 1552 // The new persistence format of chunked persistence. 1553 LOG.info("Reading new chunked format of persistence."); 1554 retrieveChunkedBackingMap(in); 1555 } else { 1556 // In 3.0 we have enough flexibility to dump the old cache data. 1557 // TODO: In 2.x line, this might need to be filled in to support reading the old format 1558 throw new IOException( 1559 "Persistence file does not start with protobuf magic number. " + persistencePath); 1560 } 1561 bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize); 1562 blockNumber.add(backingMap.size()); 1563 LOG.info("Bucket cache retrieved from file successfully with size: {}", backingMap.size()); 1564 } 1565 } 1566 1567 private void updateRegionSizeMapWhileRetrievingFromFile() { 1568 // Update the regionCachedSize with the region size while restarting the region server 1569 if (LOG.isDebugEnabled()) { 1570 LOG.debug("Updating region size map after retrieving cached file list"); 1571 dumpPrefetchList(); 1572 } 1573 regionCachedSize.clear(); 1574 fullyCachedFiles.forEach((hFileName, hFileSize) -> { 1575 // Get the region name for each file 1576 String regionEncodedName = hFileSize.getFirst(); 1577 long cachedFileSize = hFileSize.getSecond(); 1578 regionCachedSize.merge(regionEncodedName, cachedFileSize, 1579 (oldpf, fileSize) -> oldpf + fileSize); 1580 }); 1581 } 1582 1583 private void dumpPrefetchList() { 1584 for (Map.Entry<String, Pair<String, Long>> outerEntry : fullyCachedFiles.entrySet()) { 1585 LOG.debug("Cached File Entry:<{},<{},{}>>", outerEntry.getKey(), 1586 outerEntry.getValue().getFirst(), outerEntry.getValue().getSecond()); 1587 } 1588 } 1589 1590 private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass) 1591 throws IOException { 1592 if (capacitySize != cacheCapacity) { 1593 throw new IOException("Mismatched cache capacity:" + StringUtils.byteDesc(capacitySize) 1594 + ", expected: " + StringUtils.byteDesc(cacheCapacity)); 1595 } 1596 if (!ioEngine.getClass().getName().equals(ioclass)) { 1597 throw new IOException("Class name for IO engine mismatch: " + ioclass + ", expected:" 1598 + ioEngine.getClass().getName()); 1599 } 1600 if (!backingMap.getClass().getName().equals(mapclass)) { 1601 throw new IOException("Class name for cache map mismatch: " + mapclass + ", expected:" 1602 + backingMap.getClass().getName()); 1603 } 1604 } 1605 1606 private void verifyFileIntegrity(BucketCacheProtos.BucketCacheEntry proto) { 1607 try { 1608 if (proto.hasChecksum()) { 1609 ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(), 1610 algorithm); 1611 } 1612 backingMapValidated.set(true); 1613 } catch (IOException e) { 1614 LOG.warn("Checksum for cache file failed. " 1615 + "We need to validate each cache key in the backing map. " 1616 + "This may take some time, so we'll do it in a background thread,"); 1617 1618 Runnable cacheValidator = () -> { 1619 while (bucketAllocator == null) { 1620 try { 1621 Thread.sleep(50); 1622 } catch (InterruptedException ex) { 1623 throw new RuntimeException(ex); 1624 } 1625 } 1626 long startTime = EnvironmentEdgeManager.currentTime(); 1627 int totalKeysOriginally = backingMap.size(); 1628 for (Map.Entry<BlockCacheKey, BucketEntry> keyEntry : backingMap.entrySet()) { 1629 try { 1630 ((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue()); 1631 } catch (IOException e1) { 1632 LOG.debug("Check for key {} failed. Evicting.", keyEntry.getKey()); 1633 evictBlock(keyEntry.getKey()); 1634 fileNotFullyCached(keyEntry.getKey().getHfileName()); 1635 } 1636 } 1637 backingMapValidated.set(true); 1638 LOG.info("Finished validating {} keys in the backing map. Recovered: {}. This took {}ms.", 1639 totalKeysOriginally, backingMap.size(), 1640 (EnvironmentEdgeManager.currentTime() - startTime)); 1641 }; 1642 Thread t = new Thread(cacheValidator); 1643 t.setDaemon(true); 1644 t.start(); 1645 } 1646 } 1647 1648 private void updateCacheIndex(BucketCacheProtos.BackingMap chunk, 1649 java.util.Map<java.lang.Integer, java.lang.String> deserializer) throws IOException { 1650 Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair2 = 1651 BucketProtoUtils.fromPB(deserializer, chunk, this::createRecycler); 1652 backingMap.putAll(pair2.getFirst()); 1653 blocksByHFile.addAll(pair2.getSecond()); 1654 } 1655 1656 private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException { 1657 Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair = 1658 BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(), 1659 this::createRecycler); 1660 backingMap = pair.getFirst(); 1661 blocksByHFile = pair.getSecond(); 1662 fullyCachedFiles.clear(); 1663 fullyCachedFiles.putAll(BucketProtoUtils.fromPB(proto.getCachedFilesMap())); 1664 1665 LOG.info("After retrieval Backing map size: {}, fullyCachedFiles size: {}", backingMap.size(), 1666 fullyCachedFiles.size()); 1667 1668 verifyFileIntegrity(proto); 1669 updateRegionSizeMapWhileRetrievingFromFile(); 1670 verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass()); 1671 } 1672 1673 private void persistChunkedBackingMap(FileOutputStream fos) throws IOException { 1674 LOG.debug( 1675 "persistToFile: before persisting backing map size: {}, " 1676 + "fullycachedFiles size: {}, chunkSize: {}", 1677 backingMap.size(), fullyCachedFiles.size(), persistenceChunkSize); 1678 1679 BucketProtoUtils.serializeAsPB(this, fos, persistenceChunkSize); 1680 1681 LOG.debug( 1682 "persistToFile: after persisting backing map size: {}, " + "fullycachedFiles size: {}", 1683 backingMap.size(), fullyCachedFiles.size()); 1684 } 1685 1686 private void retrieveChunkedBackingMap(FileInputStream in) throws IOException { 1687 1688 // Read the first chunk that has all the details. 1689 BucketCacheProtos.BucketCacheEntry cacheEntry = 1690 BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in); 1691 1692 fullyCachedFiles.clear(); 1693 fullyCachedFiles.putAll(BucketProtoUtils.fromPB(cacheEntry.getCachedFilesMap())); 1694 1695 backingMap.clear(); 1696 blocksByHFile.clear(); 1697 1698 // Read the backing map entries in batches. 1699 int numChunks = 0; 1700 while (in.available() > 0) { 1701 updateCacheIndex(BucketCacheProtos.BackingMap.parseDelimitedFrom(in), 1702 cacheEntry.getDeserializersMap()); 1703 numChunks++; 1704 } 1705 1706 LOG.info("Retrieved {} of chunks with blockCount = {}.", numChunks, backingMap.size()); 1707 verifyFileIntegrity(cacheEntry); 1708 verifyCapacityAndClasses(cacheEntry.getCacheCapacity(), cacheEntry.getIoClass(), 1709 cacheEntry.getMapClass()); 1710 updateRegionSizeMapWhileRetrievingFromFile(); 1711 } 1712 1713 /** 1714 * Check whether we tolerate IO error this time. If the duration of IOEngine throwing errors 1715 * exceeds ioErrorsDurationTimeTolerated, we will disable the cache 1716 */ 1717 private void checkIOErrorIsTolerated() { 1718 long now = EnvironmentEdgeManager.currentTime(); 1719 // Do a single read to a local variable to avoid timing issue - HBASE-24454 1720 long ioErrorStartTimeTmp = this.ioErrorStartTime; 1721 if (ioErrorStartTimeTmp > 0) { 1722 if (isCacheEnabled() && (now - ioErrorStartTimeTmp) > this.ioErrorsTolerationDuration) { 1723 LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration 1724 + "ms, disabling cache, please check your IOEngine"); 1725 disableCache(); 1726 } 1727 } else { 1728 this.ioErrorStartTime = now; 1729 } 1730 } 1731 1732 /** 1733 * Used to shut down the cache -or- turn it off in the case of something broken. 1734 */ 1735 private void disableCache() { 1736 if (!isCacheEnabled()) { 1737 return; 1738 } 1739 LOG.info("Disabling cache"); 1740 cacheState = CacheState.DISABLED; 1741 ioEngine.shutdown(); 1742 this.scheduleThreadPool.shutdown(); 1743 for (int i = 0; i < writerThreads.length; ++i) 1744 writerThreads[i].interrupt(); 1745 this.ramCache.clear(); 1746 if (!ioEngine.isPersistent() || persistencePath == null) { 1747 // If persistent ioengine and a path, we will serialize out the backingMap. 1748 this.backingMap.clear(); 1749 this.blocksByHFile.clear(); 1750 this.fullyCachedFiles.clear(); 1751 this.regionCachedSize.clear(); 1752 } 1753 } 1754 1755 private void join() throws InterruptedException { 1756 for (int i = 0; i < writerThreads.length; ++i) 1757 writerThreads[i].join(); 1758 } 1759 1760 @Override 1761 public void shutdown() { 1762 disableCache(); 1763 LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() + "; path to write=" 1764 + persistencePath); 1765 if (ioEngine.isPersistent() && persistencePath != null) { 1766 try { 1767 join(); 1768 if (cachePersister != null) { 1769 LOG.info("Shutting down cache persister thread."); 1770 cachePersister.shutdown(); 1771 while (cachePersister.isAlive()) { 1772 Thread.sleep(10); 1773 } 1774 } 1775 persistToFile(); 1776 } catch (IOException ex) { 1777 LOG.error("Unable to persist data on exit: " + ex.toString(), ex); 1778 } catch (InterruptedException e) { 1779 LOG.warn("Failed to persist data on exit", e); 1780 } 1781 } 1782 } 1783 1784 /** 1785 * Needed mostly for UTs that might run in the same VM and create different BucketCache instances 1786 * on different UT methods. 1787 */ 1788 @Override 1789 protected void finalize() { 1790 if (cachePersister != null && !cachePersister.isInterrupted()) { 1791 cachePersister.interrupt(); 1792 } 1793 } 1794 1795 @Override 1796 public CacheStats getStats() { 1797 return cacheStats; 1798 } 1799 1800 public BucketAllocator getAllocator() { 1801 return this.bucketAllocator; 1802 } 1803 1804 @Override 1805 public long heapSize() { 1806 return this.heapSize.sum(); 1807 } 1808 1809 @Override 1810 public long size() { 1811 return this.realCacheSize.sum(); 1812 } 1813 1814 @Override 1815 public long getCurrentDataSize() { 1816 return size(); 1817 } 1818 1819 @Override 1820 public long getFreeSize() { 1821 if (!isCacheInitialized("BucketCache:getFreeSize")) { 1822 return 0; 1823 } 1824 return this.bucketAllocator.getFreeSize(); 1825 } 1826 1827 @Override 1828 public long getBlockCount() { 1829 return this.blockNumber.sum(); 1830 } 1831 1832 @Override 1833 public long getDataBlockCount() { 1834 return getBlockCount(); 1835 } 1836 1837 @Override 1838 public long getCurrentSize() { 1839 if (!isCacheInitialized("BucketCache::getCurrentSize")) { 1840 return 0; 1841 } 1842 return this.bucketAllocator.getUsedSize(); 1843 } 1844 1845 protected String getAlgorithm() { 1846 return algorithm; 1847 } 1848 1849 /** 1850 * Evicts all blocks for a specific HFile. 1851 * <p> 1852 * This is used for evict-on-close to remove all blocks of a specific HFile. 1853 * @return the number of blocks evicted 1854 */ 1855 @Override 1856 public int evictBlocksByHfileName(String hfileName) { 1857 return evictBlocksRangeByHfileName(hfileName, 0, Long.MAX_VALUE); 1858 } 1859 1860 @Override 1861 public int evictBlocksRangeByHfileName(String hfileName, long initOffset, long endOffset) { 1862 fileNotFullyCached(hfileName); 1863 Set<BlockCacheKey> keySet = getAllCacheKeysForFile(hfileName, initOffset, endOffset); 1864 LOG.debug("found {} blocks for file {}, starting offset: {}, end offset: {}", keySet.size(), 1865 hfileName, initOffset, endOffset); 1866 int numEvicted = 0; 1867 for (BlockCacheKey key : keySet) { 1868 if (evictBlock(key)) { 1869 ++numEvicted; 1870 } 1871 } 1872 return numEvicted; 1873 } 1874 1875 private Set<BlockCacheKey> getAllCacheKeysForFile(String hfileName, long init, long end) { 1876 return blocksByHFile.subSet(new BlockCacheKey(hfileName, init), true, 1877 new BlockCacheKey(hfileName, end), true); 1878 } 1879 1880 /** 1881 * Used to group bucket entries into priority buckets. There will be a BucketEntryGroup for each 1882 * priority (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate 1883 * number of elements out of each according to configuration parameters and their relative sizes. 1884 */ 1885 private class BucketEntryGroup { 1886 1887 private CachedEntryQueue queue; 1888 private long totalSize = 0; 1889 private long bucketSize; 1890 1891 public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) { 1892 this.bucketSize = bucketSize; 1893 queue = new CachedEntryQueue(bytesToFree, blockSize); 1894 totalSize = 0; 1895 } 1896 1897 public void add(Map.Entry<BlockCacheKey, BucketEntry> block) { 1898 totalSize += block.getValue().getLength(); 1899 queue.add(block); 1900 } 1901 1902 public long free(long toFree) { 1903 Map.Entry<BlockCacheKey, BucketEntry> entry; 1904 long freedBytes = 0; 1905 // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free 1906 // What to do then? Caching attempt fail? Need some changes in cacheBlock API? 1907 while ((entry = queue.pollLast()) != null) { 1908 BlockCacheKey blockCacheKey = entry.getKey(); 1909 BucketEntry be = entry.getValue(); 1910 if (evictBucketEntryIfNoRpcReferenced(blockCacheKey, be)) { 1911 freedBytes += be.getLength(); 1912 } 1913 if (freedBytes >= toFree) { 1914 return freedBytes; 1915 } 1916 } 1917 return freedBytes; 1918 } 1919 1920 public long overflow() { 1921 return totalSize - bucketSize; 1922 } 1923 1924 public long totalSize() { 1925 return totalSize; 1926 } 1927 } 1928 1929 /** 1930 * Block Entry stored in the memory with key,data and so on 1931 */ 1932 static class RAMQueueEntry { 1933 private final BlockCacheKey key; 1934 private final Cacheable data; 1935 private long accessCounter; 1936 private boolean inMemory; 1937 private boolean isCachePersistent; 1938 1939 RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory, 1940 boolean isCachePersistent) { 1941 this.key = bck; 1942 this.data = data; 1943 this.accessCounter = accessCounter; 1944 this.inMemory = inMemory; 1945 this.isCachePersistent = isCachePersistent; 1946 } 1947 1948 public Cacheable getData() { 1949 return data; 1950 } 1951 1952 public BlockCacheKey getKey() { 1953 return key; 1954 } 1955 1956 public void access(long accessCounter) { 1957 this.accessCounter = accessCounter; 1958 } 1959 1960 private ByteBuffAllocator getByteBuffAllocator() { 1961 if (data instanceof HFileBlock) { 1962 return ((HFileBlock) data).getByteBuffAllocator(); 1963 } 1964 return ByteBuffAllocator.HEAP; 1965 } 1966 1967 public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc, 1968 final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler, 1969 ByteBuffer metaBuff) throws IOException { 1970 int len = data.getSerializedLength(); 1971 // This cacheable thing can't be serialized 1972 if (len == 0) { 1973 return null; 1974 } 1975 if (isCachePersistent && data instanceof HFileBlock) { 1976 len += Long.BYTES; // we need to record the cache time for consistency check in case of 1977 // recovery 1978 } 1979 long offset = alloc.allocateBlock(len); 1980 boolean succ = false; 1981 BucketEntry bucketEntry = null; 1982 try { 1983 int diskSizeWithHeader = (data instanceof HFileBlock) 1984 ? ((HFileBlock) data).getOnDiskSizeWithHeader() 1985 : data.getSerializedLength(); 1986 bucketEntry = new BucketEntry(offset, len, diskSizeWithHeader, accessCounter, inMemory, 1987 createRecycler, getByteBuffAllocator()); 1988 bucketEntry.setDeserializerReference(data.getDeserializer()); 1989 if (data instanceof HFileBlock) { 1990 // If an instance of HFileBlock, save on some allocations. 1991 HFileBlock block = (HFileBlock) data; 1992 ByteBuff sliceBuf = block.getBufferReadOnly(); 1993 block.getMetaData(metaBuff); 1994 // adds the cache time prior to the block and metadata part 1995 if (isCachePersistent) { 1996 ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); 1997 buffer.putLong(bucketEntry.getCachedTime()); 1998 buffer.rewind(); 1999 ioEngine.write(buffer, offset); 2000 ioEngine.write(sliceBuf, (offset + Long.BYTES)); 2001 } else { 2002 ioEngine.write(sliceBuf, offset); 2003 } 2004 ioEngine.write(metaBuff, offset + len - metaBuff.limit()); 2005 } else { 2006 // Only used for testing. 2007 ByteBuffer bb = ByteBuffer.allocate(len); 2008 data.serialize(bb, true); 2009 ioEngine.write(bb, offset); 2010 } 2011 succ = true; 2012 } finally { 2013 if (!succ) { 2014 alloc.freeBlock(offset, len); 2015 } 2016 } 2017 realCacheSize.add(len); 2018 return bucketEntry; 2019 } 2020 } 2021 2022 /** 2023 * Only used in test 2024 */ 2025 void stopWriterThreads() throws InterruptedException { 2026 for (WriterThread writerThread : writerThreads) { 2027 writerThread.disableWriter(); 2028 writerThread.interrupt(); 2029 writerThread.join(); 2030 } 2031 } 2032 2033 @Override 2034 public Iterator<CachedBlock> iterator() { 2035 // Don't bother with ramcache since stuff is in here only a little while. 2036 final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i = this.backingMap.entrySet().iterator(); 2037 return new Iterator<CachedBlock>() { 2038 private final long now = System.nanoTime(); 2039 2040 @Override 2041 public boolean hasNext() { 2042 return i.hasNext(); 2043 } 2044 2045 @Override 2046 public CachedBlock next() { 2047 final Map.Entry<BlockCacheKey, BucketEntry> e = i.next(); 2048 return new CachedBlock() { 2049 @Override 2050 public String toString() { 2051 return BlockCacheUtil.toString(this, now); 2052 } 2053 2054 @Override 2055 public BlockPriority getBlockPriority() { 2056 return e.getValue().getPriority(); 2057 } 2058 2059 @Override 2060 public BlockType getBlockType() { 2061 // Not held by BucketEntry. Could add it if wanted on BucketEntry creation. 2062 return null; 2063 } 2064 2065 @Override 2066 public long getOffset() { 2067 return e.getKey().getOffset(); 2068 } 2069 2070 @Override 2071 public long getSize() { 2072 return e.getValue().getLength(); 2073 } 2074 2075 @Override 2076 public long getCachedTime() { 2077 return e.getValue().getCachedTime(); 2078 } 2079 2080 @Override 2081 public String getFilename() { 2082 return e.getKey().getHfileName(); 2083 } 2084 2085 @Override 2086 public int compareTo(CachedBlock other) { 2087 int diff = this.getFilename().compareTo(other.getFilename()); 2088 if (diff != 0) return diff; 2089 2090 diff = Long.compare(this.getOffset(), other.getOffset()); 2091 if (diff != 0) return diff; 2092 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) { 2093 throw new IllegalStateException( 2094 "" + this.getCachedTime() + ", " + other.getCachedTime()); 2095 } 2096 return Long.compare(other.getCachedTime(), this.getCachedTime()); 2097 } 2098 2099 @Override 2100 public int hashCode() { 2101 return e.getKey().hashCode(); 2102 } 2103 2104 @Override 2105 public boolean equals(Object obj) { 2106 if (obj instanceof CachedBlock) { 2107 CachedBlock cb = (CachedBlock) obj; 2108 return compareTo(cb) == 0; 2109 } else { 2110 return false; 2111 } 2112 } 2113 }; 2114 } 2115 2116 @Override 2117 public void remove() { 2118 throw new UnsupportedOperationException(); 2119 } 2120 }; 2121 } 2122 2123 @Override 2124 public BlockCache[] getBlockCaches() { 2125 return null; 2126 } 2127 2128 public int getRpcRefCount(BlockCacheKey cacheKey) { 2129 BucketEntry bucketEntry = backingMap.get(cacheKey); 2130 if (bucketEntry != null) { 2131 return bucketEntry.refCnt() - (bucketEntry.markedAsEvicted.get() ? 0 : 1); 2132 } 2133 return 0; 2134 } 2135 2136 float getAcceptableFactor() { 2137 return acceptableFactor; 2138 } 2139 2140 float getMinFactor() { 2141 return minFactor; 2142 } 2143 2144 float getExtraFreeFactor() { 2145 return extraFreeFactor; 2146 } 2147 2148 float getSingleFactor() { 2149 return singleFactor; 2150 } 2151 2152 float getMultiFactor() { 2153 return multiFactor; 2154 } 2155 2156 float getMemoryFactor() { 2157 return memoryFactor; 2158 } 2159 2160 public String getPersistencePath() { 2161 return persistencePath; 2162 } 2163 2164 /** 2165 * Wrapped the delegate ConcurrentMap with maintaining its block's reference count. 2166 */ 2167 static class RAMCache { 2168 /** 2169 * Defined the map as {@link ConcurrentHashMap} explicitly here, because in 2170 * {@link RAMCache#get(BlockCacheKey)} and 2171 * {@link RAMCache#putIfAbsent(BlockCacheKey, BucketCache.RAMQueueEntry)} , we need to guarantee 2172 * the atomicity of map#computeIfPresent(key, func) and map#putIfAbsent(key, func). Besides, the 2173 * func method can execute exactly once only when the key is present(or absent) and under the 2174 * lock context. Otherwise, the reference count of block will be messed up. Notice that the 2175 * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that. 2176 */ 2177 final ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> delegate = new ConcurrentHashMap<>(); 2178 2179 public boolean containsKey(BlockCacheKey key) { 2180 return delegate.containsKey(key); 2181 } 2182 2183 public RAMQueueEntry get(BlockCacheKey key) { 2184 return delegate.computeIfPresent(key, (k, re) -> { 2185 // It'll be referenced by RPC, so retain atomically here. if the get and retain is not 2186 // atomic, another thread may remove and release the block, when retaining in this thread we 2187 // may retain a block with refCnt=0 which is disallowed. (see HBASE-22422) 2188 re.getData().retain(); 2189 return re; 2190 }); 2191 } 2192 2193 /** 2194 * Return the previous associated value, or null if absent. It has the same meaning as 2195 * {@link ConcurrentMap#putIfAbsent(Object, Object)} 2196 */ 2197 public RAMQueueEntry putIfAbsent(BlockCacheKey key, RAMQueueEntry entry) { 2198 AtomicBoolean absent = new AtomicBoolean(false); 2199 RAMQueueEntry re = delegate.computeIfAbsent(key, k -> { 2200 // The RAMCache reference to this entry, so reference count should be increment. 2201 entry.getData().retain(); 2202 absent.set(true); 2203 return entry; 2204 }); 2205 return absent.get() ? null : re; 2206 } 2207 2208 public boolean remove(BlockCacheKey key) { 2209 return remove(key, re -> { 2210 }); 2211 } 2212 2213 /** 2214 * Defined an {@link Consumer} here, because once the removed entry release its reference count, 2215 * then it's ByteBuffers may be recycled and accessing it outside this method will be thrown an 2216 * exception. the consumer will access entry to remove before release its reference count. 2217 * Notice, don't change its reference count in the {@link Consumer} 2218 */ 2219 public boolean remove(BlockCacheKey key, Consumer<RAMQueueEntry> action) { 2220 RAMQueueEntry previous = delegate.remove(key); 2221 action.accept(previous); 2222 if (previous != null) { 2223 previous.getData().release(); 2224 } 2225 return previous != null; 2226 } 2227 2228 public boolean isEmpty() { 2229 return delegate.isEmpty(); 2230 } 2231 2232 public void clear() { 2233 Iterator<Map.Entry<BlockCacheKey, RAMQueueEntry>> it = delegate.entrySet().iterator(); 2234 while (it.hasNext()) { 2235 RAMQueueEntry re = it.next().getValue(); 2236 it.remove(); 2237 re.getData().release(); 2238 } 2239 } 2240 2241 public boolean hasBlocksForFile(String fileName) { 2242 return delegate.keySet().stream().filter(key -> key.getHfileName().equals(fileName)) 2243 .findFirst().isPresent(); 2244 } 2245 } 2246 2247 public Map<BlockCacheKey, BucketEntry> getBackingMap() { 2248 return backingMap; 2249 } 2250 2251 public AtomicBoolean getBackingMapValidated() { 2252 return backingMapValidated; 2253 } 2254 2255 @Override 2256 public Optional<Map<String, Pair<String, Long>>> getFullyCachedFiles() { 2257 return Optional.of(fullyCachedFiles); 2258 } 2259 2260 public static Optional<BucketCache> getBucketCacheFromCacheConfig(CacheConfig cacheConf) { 2261 if (cacheConf.getBlockCache().isPresent()) { 2262 BlockCache bc = cacheConf.getBlockCache().get(); 2263 if (bc instanceof CombinedBlockCache) { 2264 BlockCache l2 = ((CombinedBlockCache) bc).getSecondLevelCache(); 2265 if (l2 instanceof BucketCache) { 2266 return Optional.of((BucketCache) l2); 2267 } 2268 } else if (bc instanceof BucketCache) { 2269 return Optional.of((BucketCache) bc); 2270 } 2271 } 2272 return Optional.empty(); 2273 } 2274 2275 @Override 2276 public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int dataBlockCount, 2277 long size) { 2278 // block eviction may be happening in the background as prefetch runs, 2279 // so we need to count all blocks for this file in the backing map under 2280 // a read lock for the block offset 2281 final List<ReentrantReadWriteLock> locks = new ArrayList<>(); 2282 LOG.debug("Notifying caching completed for file {}, with total blocks {}, and data blocks {}", 2283 fileName, totalBlockCount, dataBlockCount); 2284 try { 2285 final MutableInt count = new MutableInt(); 2286 LOG.debug("iterating over {} entries in the backing map", backingMap.size()); 2287 Set<BlockCacheKey> result = getAllCacheKeysForFile(fileName.getName(), 0, Long.MAX_VALUE); 2288 if (result.isEmpty() && StoreFileInfo.isReference(fileName)) { 2289 result = getAllCacheKeysForFile( 2290 StoreFileInfo.getReferredToRegionAndFile(fileName.getName()).getSecond(), 0, 2291 Long.MAX_VALUE); 2292 } 2293 result.stream().forEach(entry -> { 2294 LOG.debug("found block for file {} in the backing map. Acquiring read lock for offset {}", 2295 fileName.getName(), entry.getOffset()); 2296 ReentrantReadWriteLock lock = offsetLock.getLock(entry.getOffset()); 2297 lock.readLock().lock(); 2298 locks.add(lock); 2299 if (backingMap.containsKey(entry) && entry.getBlockType() == BlockType.DATA) { 2300 count.increment(); 2301 } 2302 }); 2303 int metaCount = totalBlockCount - dataBlockCount; 2304 // BucketCache would only have data blocks 2305 if (dataBlockCount == count.getValue()) { 2306 LOG.debug("File {} has now been fully cached.", fileName); 2307 fileCacheCompleted(fileName, size); 2308 } else { 2309 LOG.debug( 2310 "Prefetch executor completed for {}, but only {} data blocks were cached. " 2311 + "Total data blocks for file: {}. " 2312 + "Checking for blocks pending cache in cache writer queue.", 2313 fileName, count.getValue(), dataBlockCount); 2314 if (ramCache.hasBlocksForFile(fileName.getName())) { 2315 for (ReentrantReadWriteLock lock : locks) { 2316 lock.readLock().unlock(); 2317 } 2318 locks.clear(); 2319 LOG.debug("There are still blocks pending caching for file {}. Will sleep 100ms " 2320 + "and try the verification again.", fileName); 2321 Thread.sleep(100); 2322 notifyFileCachingCompleted(fileName, totalBlockCount, dataBlockCount, size); 2323 } else if ( 2324 (getAllCacheKeysForFile(fileName.getName(), 0, Long.MAX_VALUE).size() - metaCount) 2325 == dataBlockCount 2326 ) { 2327 LOG.debug("We counted {} data blocks, expected was {}, there was no more pending in " 2328 + "the cache write queue but we now found that total cached blocks for file {} " 2329 + "is equal to data block count.", count, dataBlockCount, fileName.getName()); 2330 fileCacheCompleted(fileName, size); 2331 } else { 2332 LOG.info("We found only {} data blocks cached from a total of {} for file {}, " 2333 + "but no blocks pending caching. Maybe cache is full or evictions " 2334 + "happened concurrently to cache prefetch.", count, dataBlockCount, fileName); 2335 } 2336 } 2337 } catch (InterruptedException e) { 2338 throw new RuntimeException(e); 2339 } finally { 2340 for (ReentrantReadWriteLock lock : locks) { 2341 lock.readLock().unlock(); 2342 } 2343 } 2344 } 2345 2346 @Override 2347 public Optional<Boolean> blockFitsIntoTheCache(HFileBlock block) { 2348 if (!isCacheInitialized("blockFitsIntoTheCache")) { 2349 return Optional.of(false); 2350 } 2351 2352 long currentUsed = bucketAllocator.getUsedSize(); 2353 boolean result = (currentUsed + block.getOnDiskSizeWithHeader()) < acceptableSize(); 2354 return Optional.of(result); 2355 } 2356 2357 @Override 2358 public Optional<Boolean> shouldCacheFile(String fileName) { 2359 // if we don't have the file in fullyCachedFiles, we should cache it 2360 return Optional.of(!fullyCachedFiles.containsKey(fileName)); 2361 } 2362 2363 @Override 2364 public Optional<Boolean> isAlreadyCached(BlockCacheKey key) { 2365 boolean foundKey = backingMap.containsKey(key); 2366 // if there's no entry for the key itself, we need to check if this key is for a reference, 2367 // and if so, look for a block from the referenced file using this getBlockForReference method. 2368 return Optional.of(foundKey ? true : getBlockForReference(key) != null); 2369 } 2370 2371 @Override 2372 public Optional<Integer> getBlockSize(BlockCacheKey key) { 2373 BucketEntry entry = backingMap.get(key); 2374 if (entry == null) { 2375 // the key might be for a reference tha we had found the block from the referenced file in 2376 // the cache when we first tried to cache it. 2377 entry = getBlockForReference(key); 2378 return entry == null ? Optional.empty() : Optional.of(entry.getOnDiskSizeWithHeader()); 2379 } else { 2380 return Optional.of(entry.getOnDiskSizeWithHeader()); 2381 } 2382 2383 } 2384 2385 boolean isCacheInitialized(String api) { 2386 if (cacheState == CacheState.INITIALIZING) { 2387 LOG.warn("Bucket initialisation pending at {}", api); 2388 return false; 2389 } 2390 return true; 2391 } 2392 2393 @Override 2394 public boolean waitForCacheInitialization(long timeout) { 2395 try { 2396 while (cacheState == CacheState.INITIALIZING) { 2397 if (timeout <= 0) { 2398 break; 2399 } 2400 Thread.sleep(100); 2401 timeout -= 100; 2402 } 2403 } finally { 2404 return isCacheEnabled(); 2405 } 2406 } 2407}