001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import java.io.File; 021import java.io.FileInputStream; 022import java.io.FileOutputStream; 023import java.io.IOException; 024import java.nio.ByteBuffer; 025import java.util.ArrayList; 026import java.util.Comparator; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.List; 030import java.util.Map; 031import java.util.NavigableSet; 032import java.util.PriorityQueue; 033import java.util.Set; 034import java.util.concurrent.ArrayBlockingQueue; 035import java.util.concurrent.BlockingQueue; 036import java.util.concurrent.ConcurrentHashMap; 037import java.util.concurrent.ConcurrentMap; 038import java.util.concurrent.ConcurrentSkipListSet; 039import java.util.concurrent.Executors; 040import java.util.concurrent.ScheduledExecutorService; 041import java.util.concurrent.TimeUnit; 042import java.util.concurrent.atomic.AtomicBoolean; 043import java.util.concurrent.atomic.AtomicLong; 044import java.util.concurrent.atomic.LongAdder; 045import java.util.concurrent.locks.Lock; 046import java.util.concurrent.locks.ReentrantLock; 047import java.util.concurrent.locks.ReentrantReadWriteLock; 048import java.util.function.Consumer; 049import java.util.function.Function; 050import org.apache.commons.io.IOUtils; 051import org.apache.hadoop.conf.Configuration; 052import org.apache.hadoop.hbase.HBaseConfiguration; 053import org.apache.hadoop.hbase.TableName; 054import org.apache.hadoop.hbase.client.Admin; 055import org.apache.hadoop.hbase.io.ByteBuffAllocator; 056import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; 057import org.apache.hadoop.hbase.io.HeapSize; 058import org.apache.hadoop.hbase.io.hfile.BlockCache; 059import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 060import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil; 061import org.apache.hadoop.hbase.io.hfile.BlockPriority; 062import org.apache.hadoop.hbase.io.hfile.BlockType; 063import org.apache.hadoop.hbase.io.hfile.CacheStats; 064import org.apache.hadoop.hbase.io.hfile.Cacheable; 065import org.apache.hadoop.hbase.io.hfile.CachedBlock; 066import org.apache.hadoop.hbase.io.hfile.HFileBlock; 067import org.apache.hadoop.hbase.io.hfile.HFileContext; 068import org.apache.hadoop.hbase.nio.ByteBuff; 069import org.apache.hadoop.hbase.nio.RefCnt; 070import org.apache.hadoop.hbase.protobuf.ProtobufMagic; 071import org.apache.hadoop.hbase.util.Bytes; 072import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 073import org.apache.hadoop.hbase.util.IdReadWriteLock; 074import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType; 075import org.apache.hadoop.util.StringUtils; 076import org.apache.yetus.audience.InterfaceAudience; 077import org.slf4j.Logger; 078import org.slf4j.LoggerFactory; 079 080import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 081import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 082 083import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos; 084 085/** 086 * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses BucketCache#ramCache 087 * and BucketCache#backingMap in order to determine if a given element is in the cache. The bucket 088 * cache can use off-heap memory {@link ByteBufferIOEngine} or mmap 089 * {@link ExclusiveMemoryMmapIOEngine} or pmem {@link SharedMemoryMmapIOEngine} or local files 090 * {@link FileIOEngine} to store/read the block data. 091 * <p> 092 * Eviction is via a similar algorithm as used in 093 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache} 094 * <p> 095 * BucketCache can be used as mainly a block cache (see 096 * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with a BlockCache to 097 * decrease CMS GC and heap fragmentation. 098 * <p> 099 * It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store blocks) to 100 * enlarge cache space via a victim cache. 101 */ 102@InterfaceAudience.Private 103public class BucketCache implements BlockCache, HeapSize { 104 private static final Logger LOG = LoggerFactory.getLogger(BucketCache.class); 105 106 /** Priority buckets config */ 107 static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor"; 108 static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor"; 109 static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor"; 110 static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor"; 111 static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor"; 112 static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor"; 113 114 /** Priority buckets */ 115 static final float DEFAULT_SINGLE_FACTOR = 0.25f; 116 static final float DEFAULT_MULTI_FACTOR = 0.50f; 117 static final float DEFAULT_MEMORY_FACTOR = 0.25f; 118 static final float DEFAULT_MIN_FACTOR = 0.85f; 119 120 private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f; 121 private static final float DEFAULT_ACCEPT_FACTOR = 0.95f; 122 123 // Number of blocks to clear for each of the bucket size that is full 124 private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2; 125 126 /** Statistics thread */ 127 private static final int statThreadPeriod = 5 * 60; 128 129 final static int DEFAULT_WRITER_THREADS = 3; 130 final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; 131 132 // Store/read block data 133 transient final IOEngine ioEngine; 134 135 // Store the block in this map before writing it to cache 136 transient final RAMCache ramCache; 137 // In this map, store the block's meta data like offset, length 138 transient ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMap; 139 140 /** 141 * Flag if the cache is enabled or not... We shut it off if there are IO errors for some time, so 142 * that Bucket IO exceptions/errors don't bring down the HBase server. 143 */ 144 private volatile boolean cacheEnabled; 145 146 /** 147 * A list of writer queues. We have a queue per {@link WriterThread} we have running. In other 148 * words, the work adding blocks to the BucketCache is divided up amongst the running 149 * WriterThreads. Its done by taking hash of the cache key modulo queue count. WriterThread when 150 * it runs takes whatever has been recently added and 'drains' the entries to the BucketCache. It 151 * then updates the ramCache and backingMap accordingly. 152 */ 153 transient final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList<>(); 154 transient final WriterThread[] writerThreads; 155 156 /** Volatile boolean to track if free space is in process or not */ 157 private volatile boolean freeInProgress = false; 158 private transient final Lock freeSpaceLock = new ReentrantLock(); 159 160 private final LongAdder realCacheSize = new LongAdder(); 161 private final LongAdder heapSize = new LongAdder(); 162 /** Current number of cached elements */ 163 private final LongAdder blockNumber = new LongAdder(); 164 165 /** Cache access count (sequential ID) */ 166 private final AtomicLong accessCount = new AtomicLong(); 167 168 private static final int DEFAULT_CACHE_WAIT_TIME = 50; 169 170 /** 171 * Used in tests. If this flag is false and the cache speed is very fast, bucket cache will skip 172 * some blocks when caching. If the flag is true, we will wait until blocks are flushed to 173 * IOEngine. 174 */ 175 boolean wait_when_cache = false; 176 177 private final BucketCacheStats cacheStats = new BucketCacheStats(); 178 179 private final String persistencePath; 180 private final long cacheCapacity; 181 /** Approximate block size */ 182 private final long blockSize; 183 184 /** Duration of IO errors tolerated before we disable cache, 1 min as default */ 185 private final int ioErrorsTolerationDuration; 186 // 1 min 187 public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000; 188 189 // Start time of first IO error when reading or writing IO Engine, it will be 190 // reset after a successful read/write. 191 private volatile long ioErrorStartTime = -1; 192 193 /** 194 * A ReentrantReadWriteLock to lock on a particular block identified by offset. The purpose of 195 * this is to avoid freeing the block which is being read. 196 * <p> 197 * Key set of offsets in BucketCache is limited so soft reference is the best choice here. 198 */ 199 transient final IdReadWriteLock<Long> offsetLock = new IdReadWriteLock<>(ReferenceType.SOFT); 200 201 private final NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<>((a, b) -> { 202 int nameComparison = a.getHfileName().compareTo(b.getHfileName()); 203 if (nameComparison != 0) { 204 return nameComparison; 205 } 206 return Long.compare(a.getOffset(), b.getOffset()); 207 }); 208 209 /** Statistics thread schedule pool (for heavy debugging, could remove) */ 210 private transient final ScheduledExecutorService scheduleThreadPool = 211 Executors.newScheduledThreadPool(1, 212 new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build()); 213 214 // Allocate or free space for the block 215 private transient BucketAllocator bucketAllocator; 216 217 /** Acceptable size of cache (no evictions if size < acceptable) */ 218 private float acceptableFactor; 219 220 /** Minimum threshold of cache (when evicting, evict until size < min) */ 221 private float minFactor; 222 223 /** 224 * Free this floating point factor of extra blocks when evicting. For example free the number of 225 * blocks requested * (1 + extraFreeFactor) 226 */ 227 private float extraFreeFactor; 228 229 /** Single access bucket size */ 230 private float singleFactor; 231 232 /** Multiple access bucket size */ 233 private float multiFactor; 234 235 /** In-memory bucket size */ 236 private float memoryFactor; 237 238 private static final String FILE_VERIFY_ALGORITHM = 239 "hbase.bucketcache.persistent.file.integrity.check.algorithm"; 240 private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5"; 241 242 /** 243 * Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file 244 * integrity, default algorithm is MD5 245 */ 246 private String algorithm; 247 248 /* Tracing failed Bucket Cache allocations. */ 249 private long allocFailLogPrevTs; // time of previous log event for allocation failure. 250 private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000; // Default 1 minute. 251 252 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 253 int writerThreadNum, int writerQLen, String persistencePath) throws IOException { 254 this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 255 persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create()); 256 } 257 258 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 259 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, 260 Configuration conf) throws IOException { 261 this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM); 262 this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath); 263 this.writerThreads = new WriterThread[writerThreadNum]; 264 long blockNumCapacity = capacity / blockSize; 265 if (blockNumCapacity >= Integer.MAX_VALUE) { 266 // Enough for about 32TB of cache! 267 throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now"); 268 } 269 270 this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR); 271 this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR); 272 this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR); 273 this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR); 274 this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR); 275 this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR); 276 277 sanityCheckConfigs(); 278 279 LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor 280 + ", minFactor: " + minFactor + ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: " 281 + singleFactor + ", multiFactor: " + multiFactor + ", memoryFactor: " + memoryFactor); 282 283 this.cacheCapacity = capacity; 284 this.persistencePath = persistencePath; 285 this.blockSize = blockSize; 286 this.ioErrorsTolerationDuration = ioErrorsTolerationDuration; 287 288 this.allocFailLogPrevTs = 0; 289 290 bucketAllocator = new BucketAllocator(capacity, bucketSizes); 291 for (int i = 0; i < writerThreads.length; ++i) { 292 writerQueues.add(new ArrayBlockingQueue<>(writerQLen)); 293 } 294 295 assert writerQueues.size() == writerThreads.length; 296 this.ramCache = new RAMCache(); 297 298 this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity); 299 300 if (ioEngine.isPersistent() && persistencePath != null) { 301 try { 302 retrieveFromFile(bucketSizes); 303 } catch (IOException ioex) { 304 LOG.error("Can't restore from file[" + persistencePath + "] because of ", ioex); 305 } 306 } 307 final String threadName = Thread.currentThread().getName(); 308 this.cacheEnabled = true; 309 for (int i = 0; i < writerThreads.length; ++i) { 310 writerThreads[i] = new WriterThread(writerQueues.get(i)); 311 writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); 312 writerThreads[i].setDaemon(true); 313 } 314 startWriterThreads(); 315 316 // Run the statistics thread periodically to print the cache statistics log 317 // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log 318 // every five minutes. 319 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod, 320 statThreadPeriod, TimeUnit.SECONDS); 321 LOG.info("Started bucket cache; ioengine=" + ioEngineName + ", capacity=" 322 + StringUtils.byteDesc(capacity) + ", blockSize=" + StringUtils.byteDesc(blockSize) 323 + ", writerThreadNum=" + writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" 324 + persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName()); 325 } 326 327 private void sanityCheckConfigs() { 328 Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0, 329 ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 330 Preconditions.checkArgument(minFactor <= 1 && minFactor >= 0, 331 MIN_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 332 Preconditions.checkArgument(minFactor <= acceptableFactor, 333 MIN_FACTOR_CONFIG_NAME + " must be <= " + ACCEPT_FACTOR_CONFIG_NAME); 334 Preconditions.checkArgument(extraFreeFactor >= 0, 335 EXTRA_FREE_FACTOR_CONFIG_NAME + " must be greater than 0.0"); 336 Preconditions.checkArgument(singleFactor <= 1 && singleFactor >= 0, 337 SINGLE_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 338 Preconditions.checkArgument(multiFactor <= 1 && multiFactor >= 0, 339 MULTI_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 340 Preconditions.checkArgument(memoryFactor <= 1 && memoryFactor >= 0, 341 MEMORY_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 342 Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1, 343 SINGLE_FACTOR_CONFIG_NAME + ", " + MULTI_FACTOR_CONFIG_NAME + ", and " 344 + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0"); 345 } 346 347 /** 348 * Called by the constructor to start the writer threads. Used by tests that need to override 349 * starting the threads. 350 */ 351 protected void startWriterThreads() { 352 for (WriterThread thread : writerThreads) { 353 thread.start(); 354 } 355 } 356 357 boolean isCacheEnabled() { 358 return this.cacheEnabled; 359 } 360 361 @Override 362 public long getMaxSize() { 363 return this.cacheCapacity; 364 } 365 366 public String getIoEngine() { 367 return ioEngine.toString(); 368 } 369 370 /** 371 * Get the IOEngine from the IO engine name nnn * @return the IOEngine n 372 */ 373 private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath) 374 throws IOException { 375 if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) { 376 // In order to make the usage simple, we only need the prefix 'files:' in 377 // document whether one or multiple file(s), but also support 'file:' for 378 // the compatibility 379 String[] filePaths = 380 ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER); 381 return new FileIOEngine(capacity, persistencePath != null, filePaths); 382 } else if (ioEngineName.startsWith("offheap")) { 383 return new ByteBufferIOEngine(capacity); 384 } else if (ioEngineName.startsWith("mmap:")) { 385 return new ExclusiveMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 386 } else if (ioEngineName.startsWith("pmem:")) { 387 // This mode of bucket cache creates an IOEngine over a file on the persistent memory 388 // device. Since the persistent memory device has its own address space the contents 389 // mapped to this address space does not get swapped out like in the case of mmapping 390 // on to DRAM. Hence the cells created out of the hfile blocks in the pmem bucket cache 391 // can be directly referred to without having to copy them onheap. Once the RPC is done, 392 // the blocks can be returned back as in case of ByteBufferIOEngine. 393 return new SharedMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 394 } else { 395 throw new IllegalArgumentException( 396 "Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap"); 397 } 398 } 399 400 /** 401 * Cache the block with the specified name and buffer. 402 * @param cacheKey block's cache key 403 * @param buf block buffer 404 */ 405 @Override 406 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 407 cacheBlock(cacheKey, buf, false); 408 } 409 410 /** 411 * Cache the block with the specified name and buffer. 412 * @param cacheKey block's cache key 413 * @param cachedItem block buffer 414 * @param inMemory if block is in-memory 415 */ 416 @Override 417 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) { 418 cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache); 419 } 420 421 /** 422 * Cache the block to ramCache 423 * @param cacheKey block's cache key 424 * @param cachedItem block buffer 425 * @param inMemory if block is in-memory 426 * @param wait if true, blocking wait when queue is full 427 */ 428 public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, 429 boolean wait) { 430 if (cacheEnabled) { 431 if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) { 432 if (shouldReplaceExistingCacheBlock(cacheKey, cachedItem)) { 433 BucketEntry bucketEntry = backingMap.get(cacheKey); 434 if (bucketEntry != null && bucketEntry.isRpcRef()) { 435 // avoid replace when there are RPC refs for the bucket entry in bucket cache 436 return; 437 } 438 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 439 } 440 } else { 441 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 442 } 443 } 444 } 445 446 protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) { 447 return BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, newBlock); 448 } 449 450 protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, 451 boolean inMemory, boolean wait) { 452 if (!cacheEnabled) { 453 return; 454 } 455 LOG.trace("Caching key={}, item={}", cacheKey, cachedItem); 456 // Stuff the entry into the RAM cache so it can get drained to the persistent store 457 RAMQueueEntry re = 458 new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory); 459 /** 460 * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same 461 * key in ramCache, the heap size of bucket cache need to update if replacing entry from 462 * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if 463 * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct 464 * one, then the heap size will mess up (HBASE-20789) 465 */ 466 if (ramCache.putIfAbsent(cacheKey, re) != null) { 467 return; 468 } 469 int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); 470 BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum); 471 boolean successfulAddition = false; 472 if (wait) { 473 try { 474 successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS); 475 } catch (InterruptedException e) { 476 Thread.currentThread().interrupt(); 477 } 478 } else { 479 successfulAddition = bq.offer(re); 480 } 481 if (!successfulAddition) { 482 ramCache.remove(cacheKey); 483 cacheStats.failInsert(); 484 } else { 485 this.blockNumber.increment(); 486 this.heapSize.add(cachedItem.heapSize()); 487 } 488 } 489 490 /** 491 * Get the buffer of the block with the specified key. 492 * @param key block's cache key 493 * @param caching true if the caller caches blocks on cache misses 494 * @param repeat Whether this is a repeat lookup for the same block 495 * @param updateCacheMetrics Whether we should update cache metrics or not 496 * @return buffer of specified cache key, or null if not in cache 497 */ 498 @Override 499 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, 500 boolean updateCacheMetrics) { 501 if (!cacheEnabled) { 502 return null; 503 } 504 RAMQueueEntry re = ramCache.get(key); 505 if (re != null) { 506 if (updateCacheMetrics) { 507 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 508 } 509 re.access(accessCount.incrementAndGet()); 510 return re.getData(); 511 } 512 BucketEntry bucketEntry = backingMap.get(key); 513 if (bucketEntry != null) { 514 long start = System.nanoTime(); 515 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); 516 try { 517 lock.readLock().lock(); 518 // We can not read here even if backingMap does contain the given key because its offset 519 // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check 520 // existence here. 521 if (bucketEntry.equals(backingMap.get(key))) { 522 // Read the block from IOEngine based on the bucketEntry's offset and length, NOTICE: the 523 // block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to 524 // the same BucketEntry, then all of the three will share the same refCnt. 525 Cacheable cachedBlock = ioEngine.read(bucketEntry); 526 if (ioEngine.usesSharedMemory()) { 527 // If IOEngine use shared memory, cachedBlock and BucketEntry will share the 528 // same RefCnt, do retain here, in order to count the number of RPC references 529 cachedBlock.retain(); 530 } 531 // Update the cache statistics. 532 if (updateCacheMetrics) { 533 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 534 cacheStats.ioHit(System.nanoTime() - start); 535 } 536 bucketEntry.access(accessCount.incrementAndGet()); 537 if (this.ioErrorStartTime > 0) { 538 ioErrorStartTime = -1; 539 } 540 return cachedBlock; 541 } 542 } catch (IOException ioex) { 543 LOG.error("Failed reading block " + key + " from bucket cache", ioex); 544 checkIOErrorIsTolerated(); 545 } finally { 546 lock.readLock().unlock(); 547 } 548 } 549 if (!repeat && updateCacheMetrics) { 550 cacheStats.miss(caching, key.isPrimary(), key.getBlockType()); 551 } 552 return null; 553 } 554 555 /** 556 * This method is invoked after the bucketEntry is removed from {@link BucketCache#backingMap} 557 */ 558 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber, 559 boolean evictedByEvictionProcess) { 560 bucketEntry.markAsEvicted(); 561 blocksByHFile.remove(cacheKey); 562 if (decrementBlockNumber) { 563 this.blockNumber.decrement(); 564 } 565 if (evictedByEvictionProcess) { 566 cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); 567 } 568 } 569 570 /** 571 * Free the {{@link BucketEntry} actually,which could only be invoked when the 572 * {@link BucketEntry#refCnt} becoming 0. 573 */ 574 void freeBucketEntry(BucketEntry bucketEntry) { 575 bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength()); 576 realCacheSize.add(-1 * bucketEntry.getLength()); 577 } 578 579 /** 580 * Try to evict the block from {@link BlockCache} by force. We'll call this in few cases:<br> 581 * 1. Close an HFile, and clear all cached blocks. <br> 582 * 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for a given table.<br> 583 * <p> 584 * Firstly, we'll try to remove the block from RAMCache,and then try to evict from backingMap. 585 * Here we evict the block from backingMap immediately, but only free the reference from bucket 586 * cache by calling {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this 587 * block, block can only be de-allocated when all of them release the block. 588 * <p> 589 * NOTICE: we need to grab the write offset lock firstly before releasing the reference from 590 * bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt = 0 when 591 * {@link BucketCache#getBlock(BlockCacheKey, boolean, boolean, boolean)}, it's a memory leak. 592 * @param cacheKey Block to evict 593 * @return true to indicate whether we've evicted successfully or not. 594 */ 595 @Override 596 public boolean evictBlock(BlockCacheKey cacheKey) { 597 return doEvictBlock(cacheKey, null, false); 598 } 599 600 /** 601 * Evict the {@link BlockCacheKey} and {@link BucketEntry} from {@link BucketCache#backingMap} and 602 * {@link BucketCache#ramCache}. <br/> 603 * NOTE:When Evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and 604 * {@link BucketEntry} could be removed. 605 * @param cacheKey {@link BlockCacheKey} to evict. 606 * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict. 607 * @return true to indicate whether we've evicted successfully or not. 608 */ 609 private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry, 610 boolean evictedByEvictionProcess) { 611 if (!cacheEnabled) { 612 return false; 613 } 614 boolean existedInRamCache = removeFromRamCache(cacheKey); 615 if (bucketEntry == null) { 616 bucketEntry = backingMap.get(cacheKey); 617 } 618 final BucketEntry bucketEntryToUse = bucketEntry; 619 620 if (bucketEntryToUse == null) { 621 if (existedInRamCache && evictedByEvictionProcess) { 622 cacheStats.evicted(0, cacheKey.isPrimary()); 623 } 624 return existedInRamCache; 625 } else { 626 return bucketEntryToUse.withWriteLock(offsetLock, () -> { 627 if (backingMap.remove(cacheKey, bucketEntryToUse)) { 628 blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache, evictedByEvictionProcess); 629 return true; 630 } 631 return false; 632 }); 633 } 634 } 635 636 /** 637 * <pre> 638 * Create the {@link Recycler} for {@link BucketEntry#refCnt},which would be used as 639 * {@link RefCnt#recycler} of {@link HFileBlock#buf} returned from {@link BucketCache#getBlock}. 640 * NOTE: for {@link BucketCache#getBlock},the {@link RefCnt#recycler} of {@link HFileBlock#buf} 641 * from {@link BucketCache#backingMap} and {@link BucketCache#ramCache} are different: 642 * 1.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#backingMap}, 643 * it is the return value of current {@link BucketCache#createRecycler} method. 644 * 645 * 2.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#ramCache}, 646 * it is {@link ByteBuffAllocator#putbackBuffer}. 647 * </pre> 648 */ 649 private Recycler createRecycler(final BucketEntry bucketEntry) { 650 return () -> { 651 freeBucketEntry(bucketEntry); 652 return; 653 }; 654 } 655 656 /** 657 * NOTE: This method is only for test. 658 */ 659 public boolean evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey) { 660 BucketEntry bucketEntry = backingMap.get(blockCacheKey); 661 if (bucketEntry == null) { 662 return false; 663 } 664 return evictBucketEntryIfNoRpcReferenced(blockCacheKey, bucketEntry); 665 } 666 667 /** 668 * Evict {@link BlockCacheKey} and its corresponding {@link BucketEntry} only if 669 * {@link BucketEntry#isRpcRef} is false. <br/> 670 * NOTE:When evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and 671 * {@link BucketEntry} could be removed. 672 * @param blockCacheKey {@link BlockCacheKey} to evict. 673 * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict. 674 * @return true to indicate whether we've evicted successfully or not. 675 */ 676 boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, BucketEntry bucketEntry) { 677 if (!bucketEntry.isRpcRef()) { 678 return doEvictBlock(blockCacheKey, bucketEntry, true); 679 } 680 return false; 681 } 682 683 protected boolean removeFromRamCache(BlockCacheKey cacheKey) { 684 return ramCache.remove(cacheKey, re -> { 685 if (re != null) { 686 this.blockNumber.decrement(); 687 this.heapSize.add(-1 * re.getData().heapSize()); 688 } 689 }); 690 } 691 692 /* 693 * Statistics thread. Periodically output cache statistics to the log. 694 */ 695 private static class StatisticsThread extends Thread { 696 private final BucketCache bucketCache; 697 698 public StatisticsThread(BucketCache bucketCache) { 699 super("BucketCacheStatsThread"); 700 setDaemon(true); 701 this.bucketCache = bucketCache; 702 } 703 704 @Override 705 public void run() { 706 bucketCache.logStats(); 707 } 708 } 709 710 public void logStats() { 711 long totalSize = bucketAllocator.getTotalSize(); 712 long usedSize = bucketAllocator.getUsedSize(); 713 long freeSize = totalSize - usedSize; 714 long cacheSize = getRealCacheSize(); 715 LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " + "totalSize=" 716 + StringUtils.byteDesc(totalSize) + ", " + "freeSize=" + StringUtils.byteDesc(freeSize) + ", " 717 + "usedSize=" + StringUtils.byteDesc(usedSize) + ", " + "cacheSize=" 718 + StringUtils.byteDesc(cacheSize) + ", " + "accesses=" + cacheStats.getRequestCount() + ", " 719 + "hits=" + cacheStats.getHitCount() + ", " + "IOhitsPerSecond=" 720 + cacheStats.getIOHitsPerSecond() + ", " + "IOTimePerHit=" 721 + String.format("%.2f", cacheStats.getIOTimePerHit()) + ", " + "hitRatio=" 722 + (cacheStats.getHitCount() == 0 723 ? "0," 724 : (StringUtils.formatPercent(cacheStats.getHitRatio(), 2) + ", ")) 725 + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits=" 726 + cacheStats.getHitCachingCount() + ", " + "cachingHitsRatio=" 727 + (cacheStats.getHitCachingCount() == 0 728 ? "0," 729 : (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2) + ", ")) 730 + "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted=" 731 + cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + cacheStats.evictedPerEviction() 732 + ", " + "allocationFailCount=" + cacheStats.getAllocationFailCount()); 733 cacheStats.reset(); 734 735 bucketAllocator.logDebugStatistics(); 736 } 737 738 public long getRealCacheSize() { 739 return this.realCacheSize.sum(); 740 } 741 742 public long acceptableSize() { 743 return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor); 744 } 745 746 long getPartitionSize(float partitionFactor) { 747 return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor); 748 } 749 750 /** 751 * Return the count of bucketSizeinfos still need free space 752 */ 753 private int bucketSizesAboveThresholdCount(float minFactor) { 754 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 755 int fullCount = 0; 756 for (int i = 0; i < stats.length; i++) { 757 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 758 freeGoal = Math.max(freeGoal, 1); 759 if (stats[i].freeCount() < freeGoal) { 760 fullCount++; 761 } 762 } 763 return fullCount; 764 } 765 766 /** 767 * This method will find the buckets that are minimally occupied and are not reference counted and 768 * will free them completely without any constraint on the access times of the elements, and as a 769 * process will completely free at most the number of buckets passed, sometimes it might not due 770 * to changing refCounts 771 * @param completelyFreeBucketsNeeded number of buckets to free 772 **/ 773 private void freeEntireBuckets(int completelyFreeBucketsNeeded) { 774 if (completelyFreeBucketsNeeded != 0) { 775 // First we will build a set where the offsets are reference counted, usually 776 // this set is small around O(Handler Count) unless something else is wrong 777 Set<Integer> inUseBuckets = new HashSet<>(); 778 backingMap.forEach((k, be) -> { 779 if (be.isRpcRef()) { 780 inUseBuckets.add(bucketAllocator.getBucketIndex(be.offset())); 781 } 782 }); 783 Set<Integer> candidateBuckets = 784 bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded); 785 for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) { 786 if (candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset()))) { 787 evictBucketEntryIfNoRpcReferenced(entry.getKey(), entry.getValue()); 788 } 789 } 790 } 791 } 792 793 /** 794 * Free the space if the used size reaches acceptableSize() or one size block couldn't be 795 * allocated. When freeing the space, we use the LRU algorithm and ensure there must be some 796 * blocks evicted 797 * @param why Why we are being called 798 */ 799 void freeSpace(final String why) { 800 // Ensure only one freeSpace progress at a time 801 if (!freeSpaceLock.tryLock()) { 802 return; 803 } 804 try { 805 freeInProgress = true; 806 long bytesToFreeWithoutExtra = 0; 807 // Calculate free byte for each bucketSizeinfo 808 StringBuilder msgBuffer = LOG.isDebugEnabled() ? new StringBuilder() : null; 809 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 810 long[] bytesToFreeForBucket = new long[stats.length]; 811 for (int i = 0; i < stats.length; i++) { 812 bytesToFreeForBucket[i] = 0; 813 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 814 freeGoal = Math.max(freeGoal, 1); 815 if (stats[i].freeCount() < freeGoal) { 816 bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount()); 817 bytesToFreeWithoutExtra += bytesToFreeForBucket[i]; 818 if (msgBuffer != null) { 819 msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" 820 + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", "); 821 } 822 } 823 } 824 if (msgBuffer != null) { 825 msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); 826 } 827 828 if (bytesToFreeWithoutExtra <= 0) { 829 return; 830 } 831 long currentSize = bucketAllocator.getUsedSize(); 832 long totalSize = bucketAllocator.getTotalSize(); 833 if (LOG.isDebugEnabled() && msgBuffer != null) { 834 LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() 835 + " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" 836 + StringUtils.byteDesc(realCacheSize.sum()) + ", total=" 837 + StringUtils.byteDesc(totalSize)); 838 } 839 840 long bytesToFreeWithExtra = 841 (long) Math.floor(bytesToFreeWithoutExtra * (1 + extraFreeFactor)); 842 843 // Instantiate priority buckets 844 BucketEntryGroup bucketSingle = 845 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(singleFactor)); 846 BucketEntryGroup bucketMulti = 847 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(multiFactor)); 848 BucketEntryGroup bucketMemory = 849 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(memoryFactor)); 850 851 // Scan entire map putting bucket entry into appropriate bucket entry 852 // group 853 for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) { 854 switch (bucketEntryWithKey.getValue().getPriority()) { 855 case SINGLE: { 856 bucketSingle.add(bucketEntryWithKey); 857 break; 858 } 859 case MULTI: { 860 bucketMulti.add(bucketEntryWithKey); 861 break; 862 } 863 case MEMORY: { 864 bucketMemory.add(bucketEntryWithKey); 865 break; 866 } 867 } 868 } 869 870 PriorityQueue<BucketEntryGroup> bucketQueue = 871 new PriorityQueue<>(3, Comparator.comparingLong(BucketEntryGroup::overflow)); 872 873 bucketQueue.add(bucketSingle); 874 bucketQueue.add(bucketMulti); 875 bucketQueue.add(bucketMemory); 876 877 int remainingBuckets = bucketQueue.size(); 878 long bytesFreed = 0; 879 880 BucketEntryGroup bucketGroup; 881 while ((bucketGroup = bucketQueue.poll()) != null) { 882 long overflow = bucketGroup.overflow(); 883 if (overflow > 0) { 884 long bucketBytesToFree = 885 Math.min(overflow, (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets); 886 bytesFreed += bucketGroup.free(bucketBytesToFree); 887 } 888 remainingBuckets--; 889 } 890 891 // Check and free if there are buckets that still need freeing of space 892 if (bucketSizesAboveThresholdCount(minFactor) > 0) { 893 bucketQueue.clear(); 894 remainingBuckets = 3; 895 896 bucketQueue.add(bucketSingle); 897 bucketQueue.add(bucketMulti); 898 bucketQueue.add(bucketMemory); 899 900 while ((bucketGroup = bucketQueue.poll()) != null) { 901 long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets; 902 bytesFreed += bucketGroup.free(bucketBytesToFree); 903 remainingBuckets--; 904 } 905 } 906 907 // Even after the above free we might still need freeing because of the 908 // De-fragmentation of the buckets (also called Slab Calcification problem), i.e 909 // there might be some buckets where the occupancy is very sparse and thus are not 910 // yielding the free for the other bucket sizes, the fix for this to evict some 911 // of the buckets, we do this by evicting the buckets that are least fulled 912 freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR * bucketSizesAboveThresholdCount(1.0f)); 913 914 if (LOG.isDebugEnabled()) { 915 long single = bucketSingle.totalSize(); 916 long multi = bucketMulti.totalSize(); 917 long memory = bucketMemory.totalSize(); 918 if (LOG.isDebugEnabled()) { 919 LOG.debug("Bucket cache free space completed; " + "freed=" 920 + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + StringUtils.byteDesc(totalSize) 921 + ", " + "single=" + StringUtils.byteDesc(single) + ", " + "multi=" 922 + StringUtils.byteDesc(multi) + ", " + "memory=" + StringUtils.byteDesc(memory)); 923 } 924 } 925 926 } catch (Throwable t) { 927 LOG.warn("Failed freeing space", t); 928 } finally { 929 cacheStats.evict(); 930 freeInProgress = false; 931 freeSpaceLock.unlock(); 932 } 933 } 934 935 // This handles flushing the RAM cache to IOEngine. 936 class WriterThread extends Thread { 937 private final BlockingQueue<RAMQueueEntry> inputQueue; 938 private volatile boolean writerEnabled = true; 939 private final ByteBuffer metaBuff = ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE); 940 941 WriterThread(BlockingQueue<RAMQueueEntry> queue) { 942 super("BucketCacheWriterThread"); 943 this.inputQueue = queue; 944 } 945 946 // Used for test 947 void disableWriter() { 948 this.writerEnabled = false; 949 } 950 951 @Override 952 public void run() { 953 List<RAMQueueEntry> entries = new ArrayList<>(); 954 try { 955 while (cacheEnabled && writerEnabled) { 956 try { 957 try { 958 // Blocks 959 entries = getRAMQueueEntries(inputQueue, entries); 960 } catch (InterruptedException ie) { 961 if (!cacheEnabled || !writerEnabled) { 962 break; 963 } 964 } 965 doDrain(entries, metaBuff); 966 } catch (Exception ioe) { 967 LOG.error("WriterThread encountered error", ioe); 968 } 969 } 970 } catch (Throwable t) { 971 LOG.warn("Failed doing drain", t); 972 } 973 LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled); 974 } 975 } 976 977 /** 978 * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing 979 * cache with a new block for the same cache key. there's a corner case: one thread cache a block 980 * in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another new block 981 * with the same cache key do the same thing for the same cache key, so if not evict the previous 982 * bucket entry, then memory leak happen because the previous bucketEntry is gone but the 983 * bucketAllocator do not free its memory. 984 * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey 985 * cacheKey, Cacheable newBlock) 986 * @param key Block cache key 987 * @param bucketEntry Bucket entry to put into backingMap. 988 */ 989 protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { 990 BucketEntry previousEntry = backingMap.put(key, bucketEntry); 991 if (previousEntry != null && previousEntry != bucketEntry) { 992 previousEntry.withWriteLock(offsetLock, () -> { 993 blockEvicted(key, previousEntry, false, false); 994 return null; 995 }); 996 } 997 } 998 999 /** 1000 * Prepare and return a warning message for Bucket Allocator Exception 1001 * @param fle The exception 1002 * @param re The RAMQueueEntry for which the exception was thrown. 1003 * @return A warning message created from the input RAMQueueEntry object. 1004 */ 1005 private static String getAllocationFailWarningMessage(final BucketAllocatorException fle, 1006 final RAMQueueEntry re) { 1007 final StringBuilder sb = new StringBuilder(); 1008 sb.append("Most recent failed allocation after "); 1009 sb.append(ALLOCATION_FAIL_LOG_TIME_PERIOD); 1010 sb.append(" ms;"); 1011 if (re != null) { 1012 if (re.getData() instanceof HFileBlock) { 1013 final HFileContext fileContext = ((HFileBlock) re.getData()).getHFileContext(); 1014 final String columnFamily = Bytes.toString(fileContext.getColumnFamily()); 1015 final String tableName = Bytes.toString(fileContext.getTableName()); 1016 if (tableName != null && columnFamily != null) { 1017 sb.append(" Table: "); 1018 sb.append(tableName); 1019 sb.append(" CF: "); 1020 sb.append(columnFamily); 1021 sb.append(" HFile: "); 1022 sb.append(fileContext.getHFileName()); 1023 } 1024 } else { 1025 sb.append(" HFile: "); 1026 sb.append(re.getKey()); 1027 } 1028 } 1029 sb.append(" Message: "); 1030 sb.append(fle.getMessage()); 1031 return sb.toString(); 1032 } 1033 1034 /** 1035 * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. Process all that 1036 * are passed in even if failure being sure to remove from ramCache else we'll never undo the 1037 * references and we'll OOME. 1038 * @param entries Presumes list passed in here will be processed by this invocation only. No 1039 * interference expected. 1040 */ 1041 void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException { 1042 if (entries.isEmpty()) { 1043 return; 1044 } 1045 // This method is a little hard to follow. We run through the passed in entries and for each 1046 // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must 1047 // do cleanup making sure we've cleared ramCache of all entries regardless of whether we 1048 // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by 1049 // filling ramCache. We do the clean up by again running through the passed in entries 1050 // doing extra work when we find a non-null bucketEntries corresponding entry. 1051 final int size = entries.size(); 1052 BucketEntry[] bucketEntries = new BucketEntry[size]; 1053 // Index updated inside loop if success or if we can't succeed. We retry if cache is full 1054 // when we go to add an entry by going around the loop again without upping the index. 1055 int index = 0; 1056 while (cacheEnabled && index < size) { 1057 RAMQueueEntry re = null; 1058 try { 1059 re = entries.get(index); 1060 if (re == null) { 1061 LOG.warn("Couldn't get entry or changed on us; who else is messing with it?"); 1062 index++; 1063 continue; 1064 } 1065 BlockCacheKey cacheKey = re.getKey(); 1066 if (ramCache.containsKey(cacheKey)) { 1067 blocksByHFile.add(cacheKey); 1068 } 1069 // Reset the position for reuse. 1070 // It should be guaranteed that the data in the metaBuff has been transferred to the 1071 // ioEngine safely. Otherwise, this reuse is problematic. Fortunately, the data is already 1072 // transferred with our current IOEngines. Should take care, when we have new kinds of 1073 // IOEngine in the future. 1074 metaBuff.clear(); 1075 BucketEntry bucketEntry = 1076 re.writeToCache(ioEngine, bucketAllocator, realCacheSize, this::createRecycler, metaBuff); 1077 // Successfully added. Up index and add bucketEntry. Clear io exceptions. 1078 bucketEntries[index] = bucketEntry; 1079 if (ioErrorStartTime > 0) { 1080 ioErrorStartTime = -1; 1081 } 1082 index++; 1083 } catch (BucketAllocatorException fle) { 1084 long currTs = EnvironmentEdgeManager.currentTime(); 1085 cacheStats.allocationFailed(); // Record the warning. 1086 if ( 1087 allocFailLogPrevTs == 0 || (currTs - allocFailLogPrevTs) > ALLOCATION_FAIL_LOG_TIME_PERIOD 1088 ) { 1089 LOG.warn(getAllocationFailWarningMessage(fle, re)); 1090 allocFailLogPrevTs = currTs; 1091 } 1092 // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below. 1093 bucketEntries[index] = null; 1094 index++; 1095 } catch (CacheFullException cfe) { 1096 // Cache full when we tried to add. Try freeing space and then retrying (don't up index) 1097 if (!freeInProgress) { 1098 freeSpace("Full!"); 1099 } else { 1100 Thread.sleep(50); 1101 } 1102 } catch (IOException ioex) { 1103 // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem. 1104 LOG.error("Failed writing to bucket cache", ioex); 1105 checkIOErrorIsTolerated(); 1106 } 1107 } 1108 1109 // Make sure data pages are written on media before we update maps. 1110 try { 1111 ioEngine.sync(); 1112 } catch (IOException ioex) { 1113 LOG.error("Failed syncing IO engine", ioex); 1114 checkIOErrorIsTolerated(); 1115 // Since we failed sync, free the blocks in bucket allocator 1116 for (int i = 0; i < entries.size(); ++i) { 1117 BucketEntry bucketEntry = bucketEntries[i]; 1118 if (bucketEntry != null) { 1119 bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength()); 1120 bucketEntries[i] = null; 1121 } 1122 } 1123 } 1124 1125 // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if 1126 // success or error. 1127 for (int i = 0; i < size; ++i) { 1128 BlockCacheKey key = entries.get(i).getKey(); 1129 // Only add if non-null entry. 1130 if (bucketEntries[i] != null) { 1131 putIntoBackingMap(key, bucketEntries[i]); 1132 } 1133 // Always remove from ramCache even if we failed adding it to the block cache above. 1134 boolean existed = ramCache.remove(key, re -> { 1135 if (re != null) { 1136 heapSize.add(-1 * re.getData().heapSize()); 1137 } 1138 }); 1139 if (!existed && bucketEntries[i] != null) { 1140 // Block should have already been evicted. Remove it and free space. 1141 final BucketEntry bucketEntry = bucketEntries[i]; 1142 bucketEntry.withWriteLock(offsetLock, () -> { 1143 if (backingMap.remove(key, bucketEntry)) { 1144 blockEvicted(key, bucketEntry, false, false); 1145 } 1146 return null; 1147 }); 1148 } 1149 } 1150 1151 long used = bucketAllocator.getUsedSize(); 1152 if (used > acceptableSize()) { 1153 freeSpace("Used=" + used + " > acceptable=" + acceptableSize()); 1154 } 1155 return; 1156 } 1157 1158 /** 1159 * Blocks until elements available in {@code q} then tries to grab as many as possible before 1160 * returning. 1161 * @param receptacle Where to stash the elements taken from queue. We clear before we use it just 1162 * in case. 1163 * @param q The queue to take from. 1164 * @return {@code receptacle} laden with elements taken from the queue or empty if none found. 1165 */ 1166 static List<RAMQueueEntry> getRAMQueueEntries(BlockingQueue<RAMQueueEntry> q, 1167 List<RAMQueueEntry> receptacle) throws InterruptedException { 1168 // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it 1169 // ok even if list grew to accommodate thousands. 1170 receptacle.clear(); 1171 receptacle.add(q.take()); 1172 q.drainTo(receptacle); 1173 return receptacle; 1174 } 1175 1176 /** 1177 * @see #retrieveFromFile(int[]) 1178 */ 1179 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION", 1180 justification = "false positive, try-with-resources ensures close is called.") 1181 private void persistToFile() throws IOException { 1182 assert !cacheEnabled; 1183 if (!ioEngine.isPersistent()) { 1184 throw new IOException("Attempt to persist non-persistent cache mappings!"); 1185 } 1186 try (FileOutputStream fos = new FileOutputStream(persistencePath, false)) { 1187 fos.write(ProtobufMagic.PB_MAGIC); 1188 BucketProtoUtils.toPB(this).writeDelimitedTo(fos); 1189 } 1190 } 1191 1192 /** 1193 * @see #persistToFile() 1194 */ 1195 private void retrieveFromFile(int[] bucketSizes) throws IOException { 1196 File persistenceFile = new File(persistencePath); 1197 if (!persistenceFile.exists()) { 1198 return; 1199 } 1200 assert !cacheEnabled; 1201 1202 try (FileInputStream in = deleteFileOnClose(persistenceFile)) { 1203 int pblen = ProtobufMagic.lengthOfPBMagic(); 1204 byte[] pbuf = new byte[pblen]; 1205 IOUtils.readFully(in, pbuf, 0, pblen); 1206 if (!ProtobufMagic.isPBMagicPrefix(pbuf)) { 1207 // In 3.0 we have enough flexibility to dump the old cache data. 1208 // TODO: In 2.x line, this might need to be filled in to support reading the old format 1209 throw new IOException( 1210 "Persistence file does not start with protobuf magic number. " + persistencePath); 1211 } 1212 parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in)); 1213 bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize); 1214 blockNumber.add(backingMap.size()); 1215 } 1216 } 1217 1218 /** 1219 * Create an input stream that deletes the file after reading it. Use in try-with-resources to 1220 * avoid this pattern where an exception thrown from a finally block may mask earlier exceptions: 1221 * 1222 * <pre> 1223 * File f = ... 1224 * try (FileInputStream fis = new FileInputStream(f)) { 1225 * // use the input stream 1226 * } finally { 1227 * if (!f.delete()) throw new IOException("failed to delete"); 1228 * } 1229 * </pre> 1230 * 1231 * @param file the file to read and delete 1232 * @return a FileInputStream for the given file 1233 * @throws IOException if there is a problem creating the stream 1234 */ 1235 private FileInputStream deleteFileOnClose(final File file) throws IOException { 1236 return new FileInputStream(file) { 1237 private File myFile; 1238 1239 private FileInputStream init(File file) { 1240 myFile = file; 1241 return this; 1242 } 1243 1244 @Override 1245 public void close() throws IOException { 1246 // close() will be called during try-with-resources and it will be 1247 // called by finalizer thread during GC. To avoid double-free resource, 1248 // set myFile to null after the first call. 1249 if (myFile == null) { 1250 return; 1251 } 1252 1253 super.close(); 1254 if (!myFile.delete()) { 1255 throw new IOException("Failed deleting persistence file " + myFile.getAbsolutePath()); 1256 } 1257 myFile = null; 1258 } 1259 }.init(file); 1260 } 1261 1262 private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass) 1263 throws IOException { 1264 if (capacitySize != cacheCapacity) { 1265 throw new IOException("Mismatched cache capacity:" + StringUtils.byteDesc(capacitySize) 1266 + ", expected: " + StringUtils.byteDesc(cacheCapacity)); 1267 } 1268 if (!ioEngine.getClass().getName().equals(ioclass)) { 1269 throw new IOException("Class name for IO engine mismatch: " + ioclass + ", expected:" 1270 + ioEngine.getClass().getName()); 1271 } 1272 if (!backingMap.getClass().getName().equals(mapclass)) { 1273 throw new IOException("Class name for cache map mismatch: " + mapclass + ", expected:" 1274 + backingMap.getClass().getName()); 1275 } 1276 } 1277 1278 private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException { 1279 if (proto.hasChecksum()) { 1280 ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(), 1281 algorithm); 1282 } else { 1283 // if has not checksum, it means the persistence file is old format 1284 LOG.info("Persistent file is old format, it does not support verifying file integrity!"); 1285 } 1286 verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass()); 1287 backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(), 1288 this::createRecycler); 1289 } 1290 1291 /** 1292 * Check whether we tolerate IO error this time. If the duration of IOEngine throwing errors 1293 * exceeds ioErrorsDurationTimeTolerated, we will disable the cache 1294 */ 1295 private void checkIOErrorIsTolerated() { 1296 long now = EnvironmentEdgeManager.currentTime(); 1297 // Do a single read to a local variable to avoid timing issue - HBASE-24454 1298 long ioErrorStartTimeTmp = this.ioErrorStartTime; 1299 if (ioErrorStartTimeTmp > 0) { 1300 if (cacheEnabled && (now - ioErrorStartTimeTmp) > this.ioErrorsTolerationDuration) { 1301 LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration 1302 + "ms, disabling cache, please check your IOEngine"); 1303 disableCache(); 1304 } 1305 } else { 1306 this.ioErrorStartTime = now; 1307 } 1308 } 1309 1310 /** 1311 * Used to shut down the cache -or- turn it off in the case of something broken. 1312 */ 1313 private void disableCache() { 1314 if (!cacheEnabled) return; 1315 cacheEnabled = false; 1316 ioEngine.shutdown(); 1317 this.scheduleThreadPool.shutdown(); 1318 for (int i = 0; i < writerThreads.length; ++i) 1319 writerThreads[i].interrupt(); 1320 this.ramCache.clear(); 1321 if (!ioEngine.isPersistent() || persistencePath == null) { 1322 // If persistent ioengine and a path, we will serialize out the backingMap. 1323 this.backingMap.clear(); 1324 } 1325 } 1326 1327 private void join() throws InterruptedException { 1328 for (int i = 0; i < writerThreads.length; ++i) 1329 writerThreads[i].join(); 1330 } 1331 1332 @Override 1333 public void shutdown() { 1334 disableCache(); 1335 LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() + "; path to write=" 1336 + persistencePath); 1337 if (ioEngine.isPersistent() && persistencePath != null) { 1338 try { 1339 join(); 1340 persistToFile(); 1341 } catch (IOException ex) { 1342 LOG.error("Unable to persist data on exit: " + ex.toString(), ex); 1343 } catch (InterruptedException e) { 1344 LOG.warn("Failed to persist data on exit", e); 1345 } 1346 } 1347 } 1348 1349 @Override 1350 public CacheStats getStats() { 1351 return cacheStats; 1352 } 1353 1354 public BucketAllocator getAllocator() { 1355 return this.bucketAllocator; 1356 } 1357 1358 @Override 1359 public long heapSize() { 1360 return this.heapSize.sum(); 1361 } 1362 1363 @Override 1364 public long size() { 1365 return this.realCacheSize.sum(); 1366 } 1367 1368 @Override 1369 public long getCurrentDataSize() { 1370 return size(); 1371 } 1372 1373 @Override 1374 public long getFreeSize() { 1375 return this.bucketAllocator.getFreeSize(); 1376 } 1377 1378 @Override 1379 public long getBlockCount() { 1380 return this.blockNumber.sum(); 1381 } 1382 1383 @Override 1384 public long getDataBlockCount() { 1385 return getBlockCount(); 1386 } 1387 1388 @Override 1389 public long getCurrentSize() { 1390 return this.bucketAllocator.getUsedSize(); 1391 } 1392 1393 protected String getAlgorithm() { 1394 return algorithm; 1395 } 1396 1397 /** 1398 * Evicts all blocks for a specific HFile. 1399 * <p> 1400 * This is used for evict-on-close to remove all blocks of a specific HFile. 1401 * @return the number of blocks evicted 1402 */ 1403 @Override 1404 public int evictBlocksByHfileName(String hfileName) { 1405 Set<BlockCacheKey> keySet = blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE), 1406 true, new BlockCacheKey(hfileName, Long.MAX_VALUE), true); 1407 1408 int numEvicted = 0; 1409 for (BlockCacheKey key : keySet) { 1410 if (evictBlock(key)) { 1411 ++numEvicted; 1412 } 1413 } 1414 1415 return numEvicted; 1416 } 1417 1418 /** 1419 * Used to group bucket entries into priority buckets. There will be a BucketEntryGroup for each 1420 * priority (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate 1421 * number of elements out of each according to configuration parameters and their relative sizes. 1422 */ 1423 private class BucketEntryGroup { 1424 1425 private CachedEntryQueue queue; 1426 private long totalSize = 0; 1427 private long bucketSize; 1428 1429 public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) { 1430 this.bucketSize = bucketSize; 1431 queue = new CachedEntryQueue(bytesToFree, blockSize); 1432 totalSize = 0; 1433 } 1434 1435 public void add(Map.Entry<BlockCacheKey, BucketEntry> block) { 1436 totalSize += block.getValue().getLength(); 1437 queue.add(block); 1438 } 1439 1440 public long free(long toFree) { 1441 Map.Entry<BlockCacheKey, BucketEntry> entry; 1442 long freedBytes = 0; 1443 // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free 1444 // What to do then? Caching attempt fail? Need some changes in cacheBlock API? 1445 while ((entry = queue.pollLast()) != null) { 1446 BlockCacheKey blockCacheKey = entry.getKey(); 1447 BucketEntry be = entry.getValue(); 1448 if (evictBucketEntryIfNoRpcReferenced(blockCacheKey, be)) { 1449 freedBytes += be.getLength(); 1450 } 1451 if (freedBytes >= toFree) { 1452 return freedBytes; 1453 } 1454 } 1455 return freedBytes; 1456 } 1457 1458 public long overflow() { 1459 return totalSize - bucketSize; 1460 } 1461 1462 public long totalSize() { 1463 return totalSize; 1464 } 1465 } 1466 1467 /** 1468 * Block Entry stored in the memory with key,data and so on 1469 */ 1470 static class RAMQueueEntry { 1471 private final BlockCacheKey key; 1472 private final Cacheable data; 1473 private long accessCounter; 1474 private boolean inMemory; 1475 1476 RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory) { 1477 this.key = bck; 1478 this.data = data; 1479 this.accessCounter = accessCounter; 1480 this.inMemory = inMemory; 1481 } 1482 1483 public Cacheable getData() { 1484 return data; 1485 } 1486 1487 public BlockCacheKey getKey() { 1488 return key; 1489 } 1490 1491 public void access(long accessCounter) { 1492 this.accessCounter = accessCounter; 1493 } 1494 1495 private ByteBuffAllocator getByteBuffAllocator() { 1496 if (data instanceof HFileBlock) { 1497 return ((HFileBlock) data).getByteBuffAllocator(); 1498 } 1499 return ByteBuffAllocator.HEAP; 1500 } 1501 1502 public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc, 1503 final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler, 1504 ByteBuffer metaBuff) throws IOException { 1505 int len = data.getSerializedLength(); 1506 // This cacheable thing can't be serialized 1507 if (len == 0) { 1508 return null; 1509 } 1510 long offset = alloc.allocateBlock(len); 1511 boolean succ = false; 1512 BucketEntry bucketEntry = null; 1513 try { 1514 bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory, createRecycler, 1515 getByteBuffAllocator()); 1516 bucketEntry.setDeserializerReference(data.getDeserializer()); 1517 if (data instanceof HFileBlock) { 1518 // If an instance of HFileBlock, save on some allocations. 1519 HFileBlock block = (HFileBlock) data; 1520 ByteBuff sliceBuf = block.getBufferReadOnly(); 1521 block.getMetaData(metaBuff); 1522 ioEngine.write(sliceBuf, offset); 1523 ioEngine.write(metaBuff, offset + len - metaBuff.limit()); 1524 } else { 1525 // Only used for testing. 1526 ByteBuffer bb = ByteBuffer.allocate(len); 1527 data.serialize(bb, true); 1528 ioEngine.write(bb, offset); 1529 } 1530 succ = true; 1531 } finally { 1532 if (!succ) { 1533 alloc.freeBlock(offset, len); 1534 } 1535 } 1536 realCacheSize.add(len); 1537 return bucketEntry; 1538 } 1539 } 1540 1541 /** 1542 * Only used in test n 1543 */ 1544 void stopWriterThreads() throws InterruptedException { 1545 for (WriterThread writerThread : writerThreads) { 1546 writerThread.disableWriter(); 1547 writerThread.interrupt(); 1548 writerThread.join(); 1549 } 1550 } 1551 1552 @Override 1553 public Iterator<CachedBlock> iterator() { 1554 // Don't bother with ramcache since stuff is in here only a little while. 1555 final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i = this.backingMap.entrySet().iterator(); 1556 return new Iterator<CachedBlock>() { 1557 private final long now = System.nanoTime(); 1558 1559 @Override 1560 public boolean hasNext() { 1561 return i.hasNext(); 1562 } 1563 1564 @Override 1565 public CachedBlock next() { 1566 final Map.Entry<BlockCacheKey, BucketEntry> e = i.next(); 1567 return new CachedBlock() { 1568 @Override 1569 public String toString() { 1570 return BlockCacheUtil.toString(this, now); 1571 } 1572 1573 @Override 1574 public BlockPriority getBlockPriority() { 1575 return e.getValue().getPriority(); 1576 } 1577 1578 @Override 1579 public BlockType getBlockType() { 1580 // Not held by BucketEntry. Could add it if wanted on BucketEntry creation. 1581 return null; 1582 } 1583 1584 @Override 1585 public long getOffset() { 1586 return e.getKey().getOffset(); 1587 } 1588 1589 @Override 1590 public long getSize() { 1591 return e.getValue().getLength(); 1592 } 1593 1594 @Override 1595 public long getCachedTime() { 1596 return e.getValue().getCachedTime(); 1597 } 1598 1599 @Override 1600 public String getFilename() { 1601 return e.getKey().getHfileName(); 1602 } 1603 1604 @Override 1605 public int compareTo(CachedBlock other) { 1606 int diff = this.getFilename().compareTo(other.getFilename()); 1607 if (diff != 0) return diff; 1608 1609 diff = Long.compare(this.getOffset(), other.getOffset()); 1610 if (diff != 0) return diff; 1611 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) { 1612 throw new IllegalStateException( 1613 "" + this.getCachedTime() + ", " + other.getCachedTime()); 1614 } 1615 return Long.compare(other.getCachedTime(), this.getCachedTime()); 1616 } 1617 1618 @Override 1619 public int hashCode() { 1620 return e.getKey().hashCode(); 1621 } 1622 1623 @Override 1624 public boolean equals(Object obj) { 1625 if (obj instanceof CachedBlock) { 1626 CachedBlock cb = (CachedBlock) obj; 1627 return compareTo(cb) == 0; 1628 } else { 1629 return false; 1630 } 1631 } 1632 }; 1633 } 1634 1635 @Override 1636 public void remove() { 1637 throw new UnsupportedOperationException(); 1638 } 1639 }; 1640 } 1641 1642 @Override 1643 public BlockCache[] getBlockCaches() { 1644 return null; 1645 } 1646 1647 public int getRpcRefCount(BlockCacheKey cacheKey) { 1648 BucketEntry bucketEntry = backingMap.get(cacheKey); 1649 if (bucketEntry != null) { 1650 return bucketEntry.refCnt() - (bucketEntry.markedAsEvicted.get() ? 0 : 1); 1651 } 1652 return 0; 1653 } 1654 1655 float getAcceptableFactor() { 1656 return acceptableFactor; 1657 } 1658 1659 float getMinFactor() { 1660 return minFactor; 1661 } 1662 1663 float getExtraFreeFactor() { 1664 return extraFreeFactor; 1665 } 1666 1667 float getSingleFactor() { 1668 return singleFactor; 1669 } 1670 1671 float getMultiFactor() { 1672 return multiFactor; 1673 } 1674 1675 float getMemoryFactor() { 1676 return memoryFactor; 1677 } 1678 1679 /** 1680 * Wrapped the delegate ConcurrentMap with maintaining its block's reference count. 1681 */ 1682 static class RAMCache { 1683 /** 1684 * Defined the map as {@link ConcurrentHashMap} explicitly here, because in 1685 * {@link RAMCache#get(BlockCacheKey)} and 1686 * {@link RAMCache#putIfAbsent(BlockCacheKey, BucketCache.RAMQueueEntry)} , we need to guarantee 1687 * the atomicity of map#computeIfPresent(key, func) and map#putIfAbsent(key, func). Besides, the 1688 * func method can execute exactly once only when the key is present(or absent) and under the 1689 * lock context. Otherwise, the reference count of block will be messed up. Notice that the 1690 * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that. 1691 */ 1692 final ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> delegate = new ConcurrentHashMap<>(); 1693 1694 public boolean containsKey(BlockCacheKey key) { 1695 return delegate.containsKey(key); 1696 } 1697 1698 public RAMQueueEntry get(BlockCacheKey key) { 1699 return delegate.computeIfPresent(key, (k, re) -> { 1700 // It'll be referenced by RPC, so retain atomically here. if the get and retain is not 1701 // atomic, another thread may remove and release the block, when retaining in this thread we 1702 // may retain a block with refCnt=0 which is disallowed. (see HBASE-22422) 1703 re.getData().retain(); 1704 return re; 1705 }); 1706 } 1707 1708 /** 1709 * Return the previous associated value, or null if absent. It has the same meaning as 1710 * {@link ConcurrentMap#putIfAbsent(Object, Object)} 1711 */ 1712 public RAMQueueEntry putIfAbsent(BlockCacheKey key, RAMQueueEntry entry) { 1713 AtomicBoolean absent = new AtomicBoolean(false); 1714 RAMQueueEntry re = delegate.computeIfAbsent(key, k -> { 1715 // The RAMCache reference to this entry, so reference count should be increment. 1716 entry.getData().retain(); 1717 absent.set(true); 1718 return entry; 1719 }); 1720 return absent.get() ? null : re; 1721 } 1722 1723 public boolean remove(BlockCacheKey key) { 1724 return remove(key, re -> { 1725 }); 1726 } 1727 1728 /** 1729 * Defined an {@link Consumer} here, because once the removed entry release its reference count, 1730 * then it's ByteBuffers may be recycled and accessing it outside this method will be thrown an 1731 * exception. the consumer will access entry to remove before release its reference count. 1732 * Notice, don't change its reference count in the {@link Consumer} 1733 */ 1734 public boolean remove(BlockCacheKey key, Consumer<RAMQueueEntry> action) { 1735 RAMQueueEntry previous = delegate.remove(key); 1736 action.accept(previous); 1737 if (previous != null) { 1738 previous.getData().release(); 1739 } 1740 return previous != null; 1741 } 1742 1743 public boolean isEmpty() { 1744 return delegate.isEmpty(); 1745 } 1746 1747 public void clear() { 1748 Iterator<Map.Entry<BlockCacheKey, RAMQueueEntry>> it = delegate.entrySet().iterator(); 1749 while (it.hasNext()) { 1750 RAMQueueEntry re = it.next().getValue(); 1751 it.remove(); 1752 re.getData().release(); 1753 } 1754 } 1755 } 1756}