001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import java.io.File; 021import java.io.FileInputStream; 022import java.io.FileOutputStream; 023import java.io.IOException; 024import java.nio.ByteBuffer; 025import java.util.ArrayList; 026import java.util.Comparator; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.List; 030import java.util.Map; 031import java.util.NavigableSet; 032import java.util.PriorityQueue; 033import java.util.Set; 034import java.util.concurrent.ArrayBlockingQueue; 035import java.util.concurrent.BlockingQueue; 036import java.util.concurrent.ConcurrentHashMap; 037import java.util.concurrent.ConcurrentMap; 038import java.util.concurrent.ConcurrentSkipListSet; 039import java.util.concurrent.Executors; 040import java.util.concurrent.ScheduledExecutorService; 041import java.util.concurrent.TimeUnit; 042import java.util.concurrent.atomic.AtomicBoolean; 043import java.util.concurrent.atomic.AtomicLong; 044import java.util.concurrent.atomic.LongAdder; 045import java.util.concurrent.locks.Lock; 046import java.util.concurrent.locks.ReentrantLock; 047import java.util.concurrent.locks.ReentrantReadWriteLock; 048import java.util.function.Consumer; 049import java.util.function.Function; 050import org.apache.hadoop.conf.Configuration; 051import org.apache.hadoop.hbase.HBaseConfiguration; 052import org.apache.hadoop.hbase.TableName; 053import org.apache.hadoop.hbase.client.Admin; 054import org.apache.hadoop.hbase.io.ByteBuffAllocator; 055import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; 056import org.apache.hadoop.hbase.io.HeapSize; 057import org.apache.hadoop.hbase.io.hfile.BlockCache; 058import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 059import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil; 060import org.apache.hadoop.hbase.io.hfile.BlockPriority; 061import org.apache.hadoop.hbase.io.hfile.BlockType; 062import org.apache.hadoop.hbase.io.hfile.CacheStats; 063import org.apache.hadoop.hbase.io.hfile.Cacheable; 064import org.apache.hadoop.hbase.io.hfile.CachedBlock; 065import org.apache.hadoop.hbase.io.hfile.HFileBlock; 066import org.apache.hadoop.hbase.io.hfile.HFileContext; 067import org.apache.hadoop.hbase.nio.ByteBuff; 068import org.apache.hadoop.hbase.nio.RefCnt; 069import org.apache.hadoop.hbase.protobuf.ProtobufMagic; 070import org.apache.hadoop.hbase.util.Bytes; 071import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 072import org.apache.hadoop.hbase.util.IdReadWriteLock; 073import org.apache.hadoop.hbase.util.IdReadWriteLockStrongRef; 074import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool; 075import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool.ReferenceType; 076import org.apache.hadoop.util.StringUtils; 077import org.apache.yetus.audience.InterfaceAudience; 078import org.slf4j.Logger; 079import org.slf4j.LoggerFactory; 080 081import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 082import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 083 084import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos; 085 086/** 087 * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses BucketCache#ramCache 088 * and BucketCache#backingMap in order to determine if a given element is in the cache. The bucket 089 * cache can use off-heap memory {@link ByteBufferIOEngine} or mmap 090 * {@link ExclusiveMemoryMmapIOEngine} or pmem {@link SharedMemoryMmapIOEngine} or local files 091 * {@link FileIOEngine} to store/read the block data. 092 * <p> 093 * Eviction is via a similar algorithm as used in 094 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache} 095 * <p> 096 * BucketCache can be used as mainly a block cache (see 097 * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with a BlockCache to 098 * decrease CMS GC and heap fragmentation. 099 * <p> 100 * It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store blocks) to 101 * enlarge cache space via a victim cache. 102 */ 103@InterfaceAudience.Private 104public class BucketCache implements BlockCache, HeapSize { 105 private static final Logger LOG = LoggerFactory.getLogger(BucketCache.class); 106 107 /** Priority buckets config */ 108 static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor"; 109 static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor"; 110 static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor"; 111 static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor"; 112 static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor"; 113 static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor"; 114 115 /** Use strong reference for offsetLock or not */ 116 private static final String STRONG_REF_KEY = "hbase.bucketcache.offsetlock.usestrongref"; 117 private static final boolean STRONG_REF_DEFAULT = false; 118 119 /** Priority buckets */ 120 static final float DEFAULT_SINGLE_FACTOR = 0.25f; 121 static final float DEFAULT_MULTI_FACTOR = 0.50f; 122 static final float DEFAULT_MEMORY_FACTOR = 0.25f; 123 static final float DEFAULT_MIN_FACTOR = 0.85f; 124 125 private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f; 126 private static final float DEFAULT_ACCEPT_FACTOR = 0.95f; 127 128 // Number of blocks to clear for each of the bucket size that is full 129 private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2; 130 131 /** Statistics thread */ 132 private static final int statThreadPeriod = 5 * 60; 133 134 final static int DEFAULT_WRITER_THREADS = 3; 135 final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; 136 137 // Store/read block data 138 transient final IOEngine ioEngine; 139 140 // Store the block in this map before writing it to cache 141 transient final RAMCache ramCache; 142 // In this map, store the block's meta data like offset, length 143 transient ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMap; 144 145 /** 146 * Flag if the cache is enabled or not... We shut it off if there are IO errors for some time, so 147 * that Bucket IO exceptions/errors don't bring down the HBase server. 148 */ 149 private volatile boolean cacheEnabled; 150 151 /** 152 * A list of writer queues. We have a queue per {@link WriterThread} we have running. In other 153 * words, the work adding blocks to the BucketCache is divided up amongst the running 154 * WriterThreads. Its done by taking hash of the cache key modulo queue count. WriterThread when 155 * it runs takes whatever has been recently added and 'drains' the entries to the BucketCache. It 156 * then updates the ramCache and backingMap accordingly. 157 */ 158 transient final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList<>(); 159 transient final WriterThread[] writerThreads; 160 161 /** Volatile boolean to track if free space is in process or not */ 162 private volatile boolean freeInProgress = false; 163 private transient final Lock freeSpaceLock = new ReentrantLock(); 164 165 private final LongAdder realCacheSize = new LongAdder(); 166 private final LongAdder heapSize = new LongAdder(); 167 /** Current number of cached elements */ 168 private final LongAdder blockNumber = new LongAdder(); 169 170 /** Cache access count (sequential ID) */ 171 private final AtomicLong accessCount = new AtomicLong(); 172 173 private static final int DEFAULT_CACHE_WAIT_TIME = 50; 174 175 /** 176 * Used in tests. If this flag is false and the cache speed is very fast, bucket cache will skip 177 * some blocks when caching. If the flag is true, we will wait until blocks are flushed to 178 * IOEngine. 179 */ 180 boolean wait_when_cache = false; 181 182 private final BucketCacheStats cacheStats = new BucketCacheStats(); 183 184 private final String persistencePath; 185 private final long cacheCapacity; 186 /** Approximate block size */ 187 private final long blockSize; 188 189 /** Duration of IO errors tolerated before we disable cache, 1 min as default */ 190 private final int ioErrorsTolerationDuration; 191 // 1 min 192 public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000; 193 194 // Start time of first IO error when reading or writing IO Engine, it will be 195 // reset after a successful read/write. 196 private volatile long ioErrorStartTime = -1; 197 198 /** 199 * A ReentrantReadWriteLock to lock on a particular block identified by offset. The purpose of 200 * this is to avoid freeing the block which is being read. 201 * <p> 202 */ 203 transient final IdReadWriteLock<Long> offsetLock; 204 205 private final NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<>((a, b) -> { 206 int nameComparison = a.getHfileName().compareTo(b.getHfileName()); 207 if (nameComparison != 0) { 208 return nameComparison; 209 } 210 return Long.compare(a.getOffset(), b.getOffset()); 211 }); 212 213 /** Statistics thread schedule pool (for heavy debugging, could remove) */ 214 private transient final ScheduledExecutorService scheduleThreadPool = 215 Executors.newScheduledThreadPool(1, 216 new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build()); 217 218 // Allocate or free space for the block 219 private transient BucketAllocator bucketAllocator; 220 221 /** Acceptable size of cache (no evictions if size < acceptable) */ 222 private float acceptableFactor; 223 224 /** Minimum threshold of cache (when evicting, evict until size < min) */ 225 private float minFactor; 226 227 /** 228 * Free this floating point factor of extra blocks when evicting. For example free the number of 229 * blocks requested * (1 + extraFreeFactor) 230 */ 231 private float extraFreeFactor; 232 233 /** Single access bucket size */ 234 private float singleFactor; 235 236 /** Multiple access bucket size */ 237 private float multiFactor; 238 239 /** In-memory bucket size */ 240 private float memoryFactor; 241 242 private static final String FILE_VERIFY_ALGORITHM = 243 "hbase.bucketcache.persistent.file.integrity.check.algorithm"; 244 private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5"; 245 246 /** 247 * Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file 248 * integrity, default algorithm is MD5 249 */ 250 private String algorithm; 251 252 /* Tracing failed Bucket Cache allocations. */ 253 private long allocFailLogPrevTs; // time of previous log event for allocation failure. 254 private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000; // Default 1 minute. 255 256 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 257 int writerThreadNum, int writerQLen, String persistencePath) throws IOException { 258 this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 259 persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create()); 260 } 261 262 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 263 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, 264 Configuration conf) throws IOException { 265 boolean useStrongRef = conf.getBoolean(STRONG_REF_KEY, STRONG_REF_DEFAULT); 266 if (useStrongRef) { 267 this.offsetLock = new IdReadWriteLockStrongRef<>(); 268 } else { 269 this.offsetLock = new IdReadWriteLockWithObjectPool<>(ReferenceType.SOFT); 270 } 271 this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM); 272 this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath); 273 this.writerThreads = new WriterThread[writerThreadNum]; 274 long blockNumCapacity = capacity / blockSize; 275 if (blockNumCapacity >= Integer.MAX_VALUE) { 276 // Enough for about 32TB of cache! 277 throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now"); 278 } 279 280 this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR); 281 this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR); 282 this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR); 283 this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR); 284 this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR); 285 this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR); 286 287 sanityCheckConfigs(); 288 289 LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor 290 + ", minFactor: " + minFactor + ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: " 291 + singleFactor + ", multiFactor: " + multiFactor + ", memoryFactor: " + memoryFactor 292 + ", useStrongRef: " + useStrongRef); 293 294 this.cacheCapacity = capacity; 295 this.persistencePath = persistencePath; 296 this.blockSize = blockSize; 297 this.ioErrorsTolerationDuration = ioErrorsTolerationDuration; 298 299 this.allocFailLogPrevTs = 0; 300 301 bucketAllocator = new BucketAllocator(capacity, bucketSizes); 302 for (int i = 0; i < writerThreads.length; ++i) { 303 writerQueues.add(new ArrayBlockingQueue<>(writerQLen)); 304 } 305 306 assert writerQueues.size() == writerThreads.length; 307 this.ramCache = new RAMCache(); 308 309 this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity); 310 311 if (ioEngine.isPersistent() && persistencePath != null) { 312 try { 313 retrieveFromFile(bucketSizes); 314 } catch (IOException ioex) { 315 LOG.error("Can't restore from file[" + persistencePath + "] because of ", ioex); 316 } 317 } 318 final String threadName = Thread.currentThread().getName(); 319 this.cacheEnabled = true; 320 for (int i = 0; i < writerThreads.length; ++i) { 321 writerThreads[i] = new WriterThread(writerQueues.get(i)); 322 writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); 323 writerThreads[i].setDaemon(true); 324 } 325 startWriterThreads(); 326 327 // Run the statistics thread periodically to print the cache statistics log 328 // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log 329 // every five minutes. 330 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod, 331 statThreadPeriod, TimeUnit.SECONDS); 332 LOG.info("Started bucket cache; ioengine=" + ioEngineName + ", capacity=" 333 + StringUtils.byteDesc(capacity) + ", blockSize=" + StringUtils.byteDesc(blockSize) 334 + ", writerThreadNum=" + writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" 335 + persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName()); 336 } 337 338 private void sanityCheckConfigs() { 339 Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0, 340 ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 341 Preconditions.checkArgument(minFactor <= 1 && minFactor >= 0, 342 MIN_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 343 Preconditions.checkArgument(minFactor <= acceptableFactor, 344 MIN_FACTOR_CONFIG_NAME + " must be <= " + ACCEPT_FACTOR_CONFIG_NAME); 345 Preconditions.checkArgument(extraFreeFactor >= 0, 346 EXTRA_FREE_FACTOR_CONFIG_NAME + " must be greater than 0.0"); 347 Preconditions.checkArgument(singleFactor <= 1 && singleFactor >= 0, 348 SINGLE_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 349 Preconditions.checkArgument(multiFactor <= 1 && multiFactor >= 0, 350 MULTI_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 351 Preconditions.checkArgument(memoryFactor <= 1 && memoryFactor >= 0, 352 MEMORY_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 353 Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1, 354 SINGLE_FACTOR_CONFIG_NAME + ", " + MULTI_FACTOR_CONFIG_NAME + ", and " 355 + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0"); 356 } 357 358 /** 359 * Called by the constructor to start the writer threads. Used by tests that need to override 360 * starting the threads. 361 */ 362 protected void startWriterThreads() { 363 for (WriterThread thread : writerThreads) { 364 thread.start(); 365 } 366 } 367 368 boolean isCacheEnabled() { 369 return this.cacheEnabled; 370 } 371 372 @Override 373 public long getMaxSize() { 374 return this.cacheCapacity; 375 } 376 377 public String getIoEngine() { 378 return ioEngine.toString(); 379 } 380 381 /** 382 * Get the IOEngine from the IO engine name nnn * @return the IOEngine n 383 */ 384 private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath) 385 throws IOException { 386 if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) { 387 // In order to make the usage simple, we only need the prefix 'files:' in 388 // document whether one or multiple file(s), but also support 'file:' for 389 // the compatibility 390 String[] filePaths = 391 ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER); 392 return new FileIOEngine(capacity, persistencePath != null, filePaths); 393 } else if (ioEngineName.startsWith("offheap")) { 394 return new ByteBufferIOEngine(capacity); 395 } else if (ioEngineName.startsWith("mmap:")) { 396 return new ExclusiveMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 397 } else if (ioEngineName.startsWith("pmem:")) { 398 // This mode of bucket cache creates an IOEngine over a file on the persistent memory 399 // device. Since the persistent memory device has its own address space the contents 400 // mapped to this address space does not get swapped out like in the case of mmapping 401 // on to DRAM. Hence the cells created out of the hfile blocks in the pmem bucket cache 402 // can be directly referred to without having to copy them onheap. Once the RPC is done, 403 // the blocks can be returned back as in case of ByteBufferIOEngine. 404 return new SharedMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 405 } else { 406 throw new IllegalArgumentException( 407 "Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap"); 408 } 409 } 410 411 /** 412 * Cache the block with the specified name and buffer. 413 * @param cacheKey block's cache key 414 * @param buf block buffer 415 */ 416 @Override 417 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 418 cacheBlock(cacheKey, buf, false); 419 } 420 421 /** 422 * Cache the block with the specified name and buffer. 423 * @param cacheKey block's cache key 424 * @param cachedItem block buffer 425 * @param inMemory if block is in-memory 426 */ 427 @Override 428 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) { 429 cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache); 430 } 431 432 /** 433 * Cache the block to ramCache 434 * @param cacheKey block's cache key 435 * @param cachedItem block buffer 436 * @param inMemory if block is in-memory 437 * @param wait if true, blocking wait when queue is full 438 */ 439 public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, 440 boolean wait) { 441 if (cacheEnabled) { 442 if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) { 443 if (shouldReplaceExistingCacheBlock(cacheKey, cachedItem)) { 444 BucketEntry bucketEntry = backingMap.get(cacheKey); 445 if (bucketEntry != null && bucketEntry.isRpcRef()) { 446 // avoid replace when there are RPC refs for the bucket entry in bucket cache 447 return; 448 } 449 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 450 } 451 } else { 452 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 453 } 454 } 455 } 456 457 protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) { 458 return BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, newBlock); 459 } 460 461 protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, 462 boolean inMemory, boolean wait) { 463 if (!cacheEnabled) { 464 return; 465 } 466 LOG.trace("Caching key={}, item={}", cacheKey, cachedItem); 467 // Stuff the entry into the RAM cache so it can get drained to the persistent store 468 RAMQueueEntry re = 469 new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory); 470 /** 471 * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same 472 * key in ramCache, the heap size of bucket cache need to update if replacing entry from 473 * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if 474 * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct 475 * one, then the heap size will mess up (HBASE-20789) 476 */ 477 if (ramCache.putIfAbsent(cacheKey, re) != null) { 478 return; 479 } 480 int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); 481 BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum); 482 boolean successfulAddition = false; 483 if (wait) { 484 try { 485 successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS); 486 } catch (InterruptedException e) { 487 Thread.currentThread().interrupt(); 488 } 489 } else { 490 successfulAddition = bq.offer(re); 491 } 492 if (!successfulAddition) { 493 ramCache.remove(cacheKey); 494 cacheStats.failInsert(); 495 } else { 496 this.blockNumber.increment(); 497 this.heapSize.add(cachedItem.heapSize()); 498 } 499 } 500 501 /** 502 * Get the buffer of the block with the specified key. 503 * @param key block's cache key 504 * @param caching true if the caller caches blocks on cache misses 505 * @param repeat Whether this is a repeat lookup for the same block 506 * @param updateCacheMetrics Whether we should update cache metrics or not 507 * @return buffer of specified cache key, or null if not in cache 508 */ 509 @Override 510 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, 511 boolean updateCacheMetrics) { 512 if (!cacheEnabled) { 513 return null; 514 } 515 RAMQueueEntry re = ramCache.get(key); 516 if (re != null) { 517 if (updateCacheMetrics) { 518 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 519 } 520 re.access(accessCount.incrementAndGet()); 521 return re.getData(); 522 } 523 BucketEntry bucketEntry = backingMap.get(key); 524 if (bucketEntry != null) { 525 long start = System.nanoTime(); 526 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); 527 try { 528 lock.readLock().lock(); 529 // We can not read here even if backingMap does contain the given key because its offset 530 // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check 531 // existence here. 532 if (bucketEntry.equals(backingMap.get(key))) { 533 // Read the block from IOEngine based on the bucketEntry's offset and length, NOTICE: the 534 // block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to 535 // the same BucketEntry, then all of the three will share the same refCnt. 536 Cacheable cachedBlock = ioEngine.read(bucketEntry); 537 if (ioEngine.usesSharedMemory()) { 538 // If IOEngine use shared memory, cachedBlock and BucketEntry will share the 539 // same RefCnt, do retain here, in order to count the number of RPC references 540 cachedBlock.retain(); 541 } 542 // Update the cache statistics. 543 if (updateCacheMetrics) { 544 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 545 cacheStats.ioHit(System.nanoTime() - start); 546 } 547 bucketEntry.access(accessCount.incrementAndGet()); 548 if (this.ioErrorStartTime > 0) { 549 ioErrorStartTime = -1; 550 } 551 return cachedBlock; 552 } 553 } catch (IOException ioex) { 554 LOG.error("Failed reading block " + key + " from bucket cache", ioex); 555 checkIOErrorIsTolerated(); 556 } finally { 557 lock.readLock().unlock(); 558 } 559 } 560 if (!repeat && updateCacheMetrics) { 561 cacheStats.miss(caching, key.isPrimary(), key.getBlockType()); 562 } 563 return null; 564 } 565 566 /** 567 * This method is invoked after the bucketEntry is removed from {@link BucketCache#backingMap} 568 */ 569 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) { 570 bucketEntry.markAsEvicted(); 571 blocksByHFile.remove(cacheKey); 572 if (decrementBlockNumber) { 573 this.blockNumber.decrement(); 574 } 575 cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); 576 } 577 578 /** 579 * Free the {{@link BucketEntry} actually,which could only be invoked when the 580 * {@link BucketEntry#refCnt} becoming 0. 581 */ 582 void freeBucketEntry(BucketEntry bucketEntry) { 583 bucketAllocator.freeBlock(bucketEntry.offset()); 584 realCacheSize.add(-1 * bucketEntry.getLength()); 585 } 586 587 /** 588 * Try to evict the block from {@link BlockCache} by force. We'll call this in few cases:<br> 589 * 1. Close an HFile, and clear all cached blocks. <br> 590 * 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for a given table.<br> 591 * <p> 592 * Firstly, we'll try to remove the block from RAMCache,and then try to evict from backingMap. 593 * Here we evict the block from backingMap immediately, but only free the reference from bucket 594 * cache by calling {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this 595 * block, block can only be de-allocated when all of them release the block. 596 * <p> 597 * NOTICE: we need to grab the write offset lock firstly before releasing the reference from 598 * bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt = 0 when 599 * {@link BucketCache#getBlock(BlockCacheKey, boolean, boolean, boolean)}, it's a memory leak. 600 * @param cacheKey Block to evict 601 * @return true to indicate whether we've evicted successfully or not. 602 */ 603 @Override 604 public boolean evictBlock(BlockCacheKey cacheKey) { 605 return doEvictBlock(cacheKey, null); 606 } 607 608 /** 609 * Evict the {@link BlockCacheKey} and {@link BucketEntry} from {@link BucketCache#backingMap} and 610 * {@link BucketCache#ramCache}. <br/> 611 * NOTE:When Evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and 612 * {@link BucketEntry} could be removed. 613 * @param cacheKey {@link BlockCacheKey} to evict. 614 * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict. 615 * @return true to indicate whether we've evicted successfully or not. 616 */ 617 private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry) { 618 if (!cacheEnabled) { 619 return false; 620 } 621 boolean existedInRamCache = removeFromRamCache(cacheKey); 622 if (bucketEntry == null) { 623 bucketEntry = backingMap.get(cacheKey); 624 } 625 final BucketEntry bucketEntryToUse = bucketEntry; 626 627 if (bucketEntryToUse == null) { 628 if (existedInRamCache) { 629 cacheStats.evicted(0, cacheKey.isPrimary()); 630 } 631 return existedInRamCache; 632 } else { 633 return bucketEntryToUse.withWriteLock(offsetLock, () -> { 634 if (backingMap.remove(cacheKey, bucketEntryToUse)) { 635 blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache); 636 return true; 637 } 638 return false; 639 }); 640 } 641 } 642 643 /** 644 * <pre> 645 * Create the {@link Recycler} for {@link BucketEntry#refCnt},which would be used as 646 * {@link RefCnt#recycler} of {@link HFileBlock#buf} returned from {@link BucketCache#getBlock}. 647 * NOTE: for {@link BucketCache#getBlock},the {@link RefCnt#recycler} of {@link HFileBlock#buf} 648 * from {@link BucketCache#backingMap} and {@link BucketCache#ramCache} are different: 649 * 1.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#backingMap}, 650 * it is the return value of current {@link BucketCache#createRecycler} method. 651 * 652 * 2.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#ramCache}, 653 * it is {@link ByteBuffAllocator#putbackBuffer}. 654 * </pre> 655 */ 656 private Recycler createRecycler(final BucketEntry bucketEntry) { 657 return () -> { 658 freeBucketEntry(bucketEntry); 659 return; 660 }; 661 } 662 663 /** 664 * NOTE: This method is only for test. 665 */ 666 public boolean evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey) { 667 BucketEntry bucketEntry = backingMap.get(blockCacheKey); 668 if (bucketEntry == null) { 669 return false; 670 } 671 return evictBucketEntryIfNoRpcReferenced(blockCacheKey, bucketEntry); 672 } 673 674 /** 675 * Evict {@link BlockCacheKey} and its corresponding {@link BucketEntry} only if 676 * {@link BucketEntry#isRpcRef} is false. <br/> 677 * NOTE:When evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and 678 * {@link BucketEntry} could be removed. 679 * @param blockCacheKey {@link BlockCacheKey} to evict. 680 * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict. 681 * @return true to indicate whether we've evicted successfully or not. 682 */ 683 boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, BucketEntry bucketEntry) { 684 if (!bucketEntry.isRpcRef()) { 685 return doEvictBlock(blockCacheKey, bucketEntry); 686 } 687 return false; 688 } 689 690 protected boolean removeFromRamCache(BlockCacheKey cacheKey) { 691 return ramCache.remove(cacheKey, re -> { 692 if (re != null) { 693 this.blockNumber.decrement(); 694 this.heapSize.add(-1 * re.getData().heapSize()); 695 } 696 }); 697 } 698 699 /* 700 * Statistics thread. Periodically output cache statistics to the log. 701 */ 702 private static class StatisticsThread extends Thread { 703 private final BucketCache bucketCache; 704 705 public StatisticsThread(BucketCache bucketCache) { 706 super("BucketCacheStatsThread"); 707 setDaemon(true); 708 this.bucketCache = bucketCache; 709 } 710 711 @Override 712 public void run() { 713 bucketCache.logStats(); 714 } 715 } 716 717 public void logStats() { 718 long totalSize = bucketAllocator.getTotalSize(); 719 long usedSize = bucketAllocator.getUsedSize(); 720 long freeSize = totalSize - usedSize; 721 long cacheSize = getRealCacheSize(); 722 LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " + "totalSize=" 723 + StringUtils.byteDesc(totalSize) + ", " + "freeSize=" + StringUtils.byteDesc(freeSize) + ", " 724 + "usedSize=" + StringUtils.byteDesc(usedSize) + ", " + "cacheSize=" 725 + StringUtils.byteDesc(cacheSize) + ", " + "accesses=" + cacheStats.getRequestCount() + ", " 726 + "hits=" + cacheStats.getHitCount() + ", " + "IOhitsPerSecond=" 727 + cacheStats.getIOHitsPerSecond() + ", " + "IOTimePerHit=" 728 + String.format("%.2f", cacheStats.getIOTimePerHit()) + ", " + "hitRatio=" 729 + (cacheStats.getHitCount() == 0 730 ? "0," 731 : (StringUtils.formatPercent(cacheStats.getHitRatio(), 2) + ", ")) 732 + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits=" 733 + cacheStats.getHitCachingCount() + ", " + "cachingHitsRatio=" 734 + (cacheStats.getHitCachingCount() == 0 735 ? "0," 736 : (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2) + ", ")) 737 + "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted=" 738 + cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + cacheStats.evictedPerEviction() 739 + ", " + "allocationFailCount=" + cacheStats.getAllocationFailCount()); 740 cacheStats.reset(); 741 } 742 743 public long getRealCacheSize() { 744 return this.realCacheSize.sum(); 745 } 746 747 public long acceptableSize() { 748 return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor); 749 } 750 751 long getPartitionSize(float partitionFactor) { 752 return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor); 753 } 754 755 /** 756 * Return the count of bucketSizeinfos still need free space 757 */ 758 private int bucketSizesAboveThresholdCount(float minFactor) { 759 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 760 int fullCount = 0; 761 for (int i = 0; i < stats.length; i++) { 762 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 763 freeGoal = Math.max(freeGoal, 1); 764 if (stats[i].freeCount() < freeGoal) { 765 fullCount++; 766 } 767 } 768 return fullCount; 769 } 770 771 /** 772 * This method will find the buckets that are minimally occupied and are not reference counted and 773 * will free them completely without any constraint on the access times of the elements, and as a 774 * process will completely free at most the number of buckets passed, sometimes it might not due 775 * to changing refCounts 776 * @param completelyFreeBucketsNeeded number of buckets to free 777 **/ 778 private void freeEntireBuckets(int completelyFreeBucketsNeeded) { 779 if (completelyFreeBucketsNeeded != 0) { 780 // First we will build a set where the offsets are reference counted, usually 781 // this set is small around O(Handler Count) unless something else is wrong 782 Set<Integer> inUseBuckets = new HashSet<>(); 783 backingMap.forEach((k, be) -> { 784 if (be.isRpcRef()) { 785 inUseBuckets.add(bucketAllocator.getBucketIndex(be.offset())); 786 } 787 }); 788 Set<Integer> candidateBuckets = 789 bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded); 790 for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) { 791 if (candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset()))) { 792 evictBucketEntryIfNoRpcReferenced(entry.getKey(), entry.getValue()); 793 } 794 } 795 } 796 } 797 798 /** 799 * Free the space if the used size reaches acceptableSize() or one size block couldn't be 800 * allocated. When freeing the space, we use the LRU algorithm and ensure there must be some 801 * blocks evicted 802 * @param why Why we are being called 803 */ 804 private void freeSpace(final String why) { 805 // Ensure only one freeSpace progress at a time 806 if (!freeSpaceLock.tryLock()) { 807 return; 808 } 809 try { 810 freeInProgress = true; 811 long bytesToFreeWithoutExtra = 0; 812 // Calculate free byte for each bucketSizeinfo 813 StringBuilder msgBuffer = LOG.isDebugEnabled() ? new StringBuilder() : null; 814 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 815 long[] bytesToFreeForBucket = new long[stats.length]; 816 for (int i = 0; i < stats.length; i++) { 817 bytesToFreeForBucket[i] = 0; 818 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 819 freeGoal = Math.max(freeGoal, 1); 820 if (stats[i].freeCount() < freeGoal) { 821 bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount()); 822 bytesToFreeWithoutExtra += bytesToFreeForBucket[i]; 823 if (msgBuffer != null) { 824 msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" 825 + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", "); 826 } 827 } 828 } 829 if (msgBuffer != null) { 830 msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); 831 } 832 833 if (bytesToFreeWithoutExtra <= 0) { 834 return; 835 } 836 long currentSize = bucketAllocator.getUsedSize(); 837 long totalSize = bucketAllocator.getTotalSize(); 838 if (LOG.isDebugEnabled() && msgBuffer != null) { 839 LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() 840 + " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" 841 + StringUtils.byteDesc(realCacheSize.sum()) + ", total=" 842 + StringUtils.byteDesc(totalSize)); 843 } 844 845 long bytesToFreeWithExtra = 846 (long) Math.floor(bytesToFreeWithoutExtra * (1 + extraFreeFactor)); 847 848 // Instantiate priority buckets 849 BucketEntryGroup bucketSingle = 850 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(singleFactor)); 851 BucketEntryGroup bucketMulti = 852 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(multiFactor)); 853 BucketEntryGroup bucketMemory = 854 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(memoryFactor)); 855 856 // Scan entire map putting bucket entry into appropriate bucket entry 857 // group 858 for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) { 859 switch (bucketEntryWithKey.getValue().getPriority()) { 860 case SINGLE: { 861 bucketSingle.add(bucketEntryWithKey); 862 break; 863 } 864 case MULTI: { 865 bucketMulti.add(bucketEntryWithKey); 866 break; 867 } 868 case MEMORY: { 869 bucketMemory.add(bucketEntryWithKey); 870 break; 871 } 872 } 873 } 874 875 PriorityQueue<BucketEntryGroup> bucketQueue = 876 new PriorityQueue<>(3, Comparator.comparingLong(BucketEntryGroup::overflow)); 877 878 bucketQueue.add(bucketSingle); 879 bucketQueue.add(bucketMulti); 880 bucketQueue.add(bucketMemory); 881 882 int remainingBuckets = bucketQueue.size(); 883 long bytesFreed = 0; 884 885 BucketEntryGroup bucketGroup; 886 while ((bucketGroup = bucketQueue.poll()) != null) { 887 long overflow = bucketGroup.overflow(); 888 if (overflow > 0) { 889 long bucketBytesToFree = 890 Math.min(overflow, (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets); 891 bytesFreed += bucketGroup.free(bucketBytesToFree); 892 } 893 remainingBuckets--; 894 } 895 896 // Check and free if there are buckets that still need freeing of space 897 if (bucketSizesAboveThresholdCount(minFactor) > 0) { 898 bucketQueue.clear(); 899 remainingBuckets = 3; 900 901 bucketQueue.add(bucketSingle); 902 bucketQueue.add(bucketMulti); 903 bucketQueue.add(bucketMemory); 904 905 while ((bucketGroup = bucketQueue.poll()) != null) { 906 long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets; 907 bytesFreed += bucketGroup.free(bucketBytesToFree); 908 remainingBuckets--; 909 } 910 } 911 912 // Even after the above free we might still need freeing because of the 913 // De-fragmentation of the buckets (also called Slab Calcification problem), i.e 914 // there might be some buckets where the occupancy is very sparse and thus are not 915 // yielding the free for the other bucket sizes, the fix for this to evict some 916 // of the buckets, we do this by evicting the buckets that are least fulled 917 freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR * bucketSizesAboveThresholdCount(1.0f)); 918 919 if (LOG.isDebugEnabled()) { 920 long single = bucketSingle.totalSize(); 921 long multi = bucketMulti.totalSize(); 922 long memory = bucketMemory.totalSize(); 923 if (LOG.isDebugEnabled()) { 924 LOG.debug("Bucket cache free space completed; " + "freed=" 925 + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + StringUtils.byteDesc(totalSize) 926 + ", " + "single=" + StringUtils.byteDesc(single) + ", " + "multi=" 927 + StringUtils.byteDesc(multi) + ", " + "memory=" + StringUtils.byteDesc(memory)); 928 } 929 } 930 931 } catch (Throwable t) { 932 LOG.warn("Failed freeing space", t); 933 } finally { 934 cacheStats.evict(); 935 freeInProgress = false; 936 freeSpaceLock.unlock(); 937 } 938 } 939 940 // This handles flushing the RAM cache to IOEngine. 941 class WriterThread extends Thread { 942 private final BlockingQueue<RAMQueueEntry> inputQueue; 943 private volatile boolean writerEnabled = true; 944 private final ByteBuffer metaBuff = ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE); 945 946 WriterThread(BlockingQueue<RAMQueueEntry> queue) { 947 super("BucketCacheWriterThread"); 948 this.inputQueue = queue; 949 } 950 951 // Used for test 952 void disableWriter() { 953 this.writerEnabled = false; 954 } 955 956 @Override 957 public void run() { 958 List<RAMQueueEntry> entries = new ArrayList<>(); 959 try { 960 while (cacheEnabled && writerEnabled) { 961 try { 962 try { 963 // Blocks 964 entries = getRAMQueueEntries(inputQueue, entries); 965 } catch (InterruptedException ie) { 966 if (!cacheEnabled || !writerEnabled) { 967 break; 968 } 969 } 970 doDrain(entries, metaBuff); 971 } catch (Exception ioe) { 972 LOG.error("WriterThread encountered error", ioe); 973 } 974 } 975 } catch (Throwable t) { 976 LOG.warn("Failed doing drain", t); 977 } 978 LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled); 979 } 980 } 981 982 /** 983 * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing 984 * cache with a new block for the same cache key. there's a corner case: one thread cache a block 985 * in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another new block 986 * with the same cache key do the same thing for the same cache key, so if not evict the previous 987 * bucket entry, then memory leak happen because the previous bucketEntry is gone but the 988 * bucketAllocator do not free its memory. 989 * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey 990 * cacheKey, Cacheable newBlock) 991 * @param key Block cache key 992 * @param bucketEntry Bucket entry to put into backingMap. 993 */ 994 protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { 995 BucketEntry previousEntry = backingMap.put(key, bucketEntry); 996 if (previousEntry != null && previousEntry != bucketEntry) { 997 previousEntry.withWriteLock(offsetLock, () -> { 998 blockEvicted(key, previousEntry, false); 999 return null; 1000 }); 1001 } 1002 } 1003 1004 /** 1005 * Prepare and return a warning message for Bucket Allocator Exception 1006 * @param fle The exception 1007 * @param re The RAMQueueEntry for which the exception was thrown. 1008 * @return A warning message created from the input RAMQueueEntry object. 1009 */ 1010 private static String getAllocationFailWarningMessage(final BucketAllocatorException fle, 1011 final RAMQueueEntry re) { 1012 final StringBuilder sb = new StringBuilder(); 1013 sb.append("Most recent failed allocation after "); 1014 sb.append(ALLOCATION_FAIL_LOG_TIME_PERIOD); 1015 sb.append(" ms;"); 1016 if (re != null) { 1017 if (re.getData() instanceof HFileBlock) { 1018 final HFileContext fileContext = ((HFileBlock) re.getData()).getHFileContext(); 1019 final String columnFamily = Bytes.toString(fileContext.getColumnFamily()); 1020 final String tableName = Bytes.toString(fileContext.getTableName()); 1021 if (tableName != null && columnFamily != null) { 1022 sb.append(" Table: "); 1023 sb.append(tableName); 1024 sb.append(" CF: "); 1025 sb.append(columnFamily); 1026 sb.append(" HFile: "); 1027 sb.append(fileContext.getHFileName()); 1028 } 1029 } else { 1030 sb.append(" HFile: "); 1031 sb.append(re.getKey()); 1032 } 1033 } 1034 sb.append(" Message: "); 1035 sb.append(fle.getMessage()); 1036 return sb.toString(); 1037 } 1038 1039 /** 1040 * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. Process all that 1041 * are passed in even if failure being sure to remove from ramCache else we'll never undo the 1042 * references and we'll OOME. 1043 * @param entries Presumes list passed in here will be processed by this invocation only. No 1044 * interference expected. 1045 */ 1046 void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException { 1047 if (entries.isEmpty()) { 1048 return; 1049 } 1050 // This method is a little hard to follow. We run through the passed in entries and for each 1051 // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must 1052 // do cleanup making sure we've cleared ramCache of all entries regardless of whether we 1053 // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by 1054 // filling ramCache. We do the clean up by again running through the passed in entries 1055 // doing extra work when we find a non-null bucketEntries corresponding entry. 1056 final int size = entries.size(); 1057 BucketEntry[] bucketEntries = new BucketEntry[size]; 1058 // Index updated inside loop if success or if we can't succeed. We retry if cache is full 1059 // when we go to add an entry by going around the loop again without upping the index. 1060 int index = 0; 1061 while (cacheEnabled && index < size) { 1062 RAMQueueEntry re = null; 1063 try { 1064 re = entries.get(index); 1065 if (re == null) { 1066 LOG.warn("Couldn't get entry or changed on us; who else is messing with it?"); 1067 index++; 1068 continue; 1069 } 1070 BlockCacheKey cacheKey = re.getKey(); 1071 if (ramCache.containsKey(cacheKey)) { 1072 blocksByHFile.add(cacheKey); 1073 } 1074 // Reset the position for reuse. 1075 // It should be guaranteed that the data in the metaBuff has been transferred to the 1076 // ioEngine safely. Otherwise, this reuse is problematic. Fortunately, the data is already 1077 // transferred with our current IOEngines. Should take care, when we have new kinds of 1078 // IOEngine in the future. 1079 metaBuff.clear(); 1080 BucketEntry bucketEntry = 1081 re.writeToCache(ioEngine, bucketAllocator, realCacheSize, this::createRecycler, metaBuff); 1082 // Successfully added. Up index and add bucketEntry. Clear io exceptions. 1083 bucketEntries[index] = bucketEntry; 1084 if (ioErrorStartTime > 0) { 1085 ioErrorStartTime = -1; 1086 } 1087 index++; 1088 } catch (BucketAllocatorException fle) { 1089 long currTs = EnvironmentEdgeManager.currentTime(); 1090 cacheStats.allocationFailed(); // Record the warning. 1091 if ( 1092 allocFailLogPrevTs == 0 || (currTs - allocFailLogPrevTs) > ALLOCATION_FAIL_LOG_TIME_PERIOD 1093 ) { 1094 LOG.warn(getAllocationFailWarningMessage(fle, re)); 1095 allocFailLogPrevTs = currTs; 1096 } 1097 // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below. 1098 bucketEntries[index] = null; 1099 index++; 1100 } catch (CacheFullException cfe) { 1101 // Cache full when we tried to add. Try freeing space and then retrying (don't up index) 1102 if (!freeInProgress) { 1103 freeSpace("Full!"); 1104 } else { 1105 Thread.sleep(50); 1106 } 1107 } catch (IOException ioex) { 1108 // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem. 1109 LOG.error("Failed writing to bucket cache", ioex); 1110 checkIOErrorIsTolerated(); 1111 } 1112 } 1113 1114 // Make sure data pages are written on media before we update maps. 1115 try { 1116 ioEngine.sync(); 1117 } catch (IOException ioex) { 1118 LOG.error("Failed syncing IO engine", ioex); 1119 checkIOErrorIsTolerated(); 1120 // Since we failed sync, free the blocks in bucket allocator 1121 for (int i = 0; i < entries.size(); ++i) { 1122 if (bucketEntries[i] != null) { 1123 bucketAllocator.freeBlock(bucketEntries[i].offset()); 1124 bucketEntries[i] = null; 1125 } 1126 } 1127 } 1128 1129 // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if 1130 // success or error. 1131 for (int i = 0; i < size; ++i) { 1132 BlockCacheKey key = entries.get(i).getKey(); 1133 // Only add if non-null entry. 1134 if (bucketEntries[i] != null) { 1135 putIntoBackingMap(key, bucketEntries[i]); 1136 } 1137 // Always remove from ramCache even if we failed adding it to the block cache above. 1138 boolean existed = ramCache.remove(key, re -> { 1139 if (re != null) { 1140 heapSize.add(-1 * re.getData().heapSize()); 1141 } 1142 }); 1143 if (!existed && bucketEntries[i] != null) { 1144 // Block should have already been evicted. Remove it and free space. 1145 final BucketEntry bucketEntry = bucketEntries[i]; 1146 bucketEntry.withWriteLock(offsetLock, () -> { 1147 if (backingMap.remove(key, bucketEntry)) { 1148 blockEvicted(key, bucketEntry, false); 1149 } 1150 return null; 1151 }); 1152 } 1153 } 1154 1155 long used = bucketAllocator.getUsedSize(); 1156 if (used > acceptableSize()) { 1157 freeSpace("Used=" + used + " > acceptable=" + acceptableSize()); 1158 } 1159 return; 1160 } 1161 1162 /** 1163 * Blocks until elements available in {@code q} then tries to grab as many as possible before 1164 * returning. 1165 * @param receptacle Where to stash the elements taken from queue. We clear before we use it just 1166 * in case. 1167 * @param q The queue to take from. 1168 * @return {@code receptacle} laden with elements taken from the queue or empty if none found. 1169 */ 1170 static List<RAMQueueEntry> getRAMQueueEntries(BlockingQueue<RAMQueueEntry> q, 1171 List<RAMQueueEntry> receptacle) throws InterruptedException { 1172 // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it 1173 // ok even if list grew to accommodate thousands. 1174 receptacle.clear(); 1175 receptacle.add(q.take()); 1176 q.drainTo(receptacle); 1177 return receptacle; 1178 } 1179 1180 /** 1181 * @see #retrieveFromFile(int[]) 1182 */ 1183 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION", 1184 justification = "false positive, try-with-resources ensures close is called.") 1185 private void persistToFile() throws IOException { 1186 assert !cacheEnabled; 1187 if (!ioEngine.isPersistent()) { 1188 throw new IOException("Attempt to persist non-persistent cache mappings!"); 1189 } 1190 try (FileOutputStream fos = new FileOutputStream(persistencePath, false)) { 1191 fos.write(ProtobufMagic.PB_MAGIC); 1192 BucketProtoUtils.toPB(this).writeDelimitedTo(fos); 1193 } 1194 } 1195 1196 /** 1197 * @see #persistToFile() 1198 */ 1199 private void retrieveFromFile(int[] bucketSizes) throws IOException { 1200 File persistenceFile = new File(persistencePath); 1201 if (!persistenceFile.exists()) { 1202 return; 1203 } 1204 assert !cacheEnabled; 1205 1206 try (FileInputStream in = deleteFileOnClose(persistenceFile)) { 1207 int pblen = ProtobufMagic.lengthOfPBMagic(); 1208 byte[] pbuf = new byte[pblen]; 1209 int read = in.read(pbuf); 1210 if (read != pblen) { 1211 throw new IOException("Incorrect number of bytes read while checking for protobuf magic " 1212 + "number. Requested=" + pblen + ", Received= " + read + ", File=" + persistencePath); 1213 } 1214 if (!ProtobufMagic.isPBMagicPrefix(pbuf)) { 1215 // In 3.0 we have enough flexibility to dump the old cache data. 1216 // TODO: In 2.x line, this might need to be filled in to support reading the old format 1217 throw new IOException( 1218 "Persistence file does not start with protobuf magic number. " + persistencePath); 1219 } 1220 parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in)); 1221 bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize); 1222 blockNumber.add(backingMap.size()); 1223 } 1224 } 1225 1226 /** 1227 * Create an input stream that deletes the file after reading it. Use in try-with-resources to 1228 * avoid this pattern where an exception thrown from a finally block may mask earlier exceptions: 1229 * 1230 * <pre> 1231 * File f = ... 1232 * try (FileInputStream fis = new FileInputStream(f)) { 1233 * // use the input stream 1234 * } finally { 1235 * if (!f.delete()) throw new IOException("failed to delete"); 1236 * } 1237 * </pre> 1238 * 1239 * @param file the file to read and delete 1240 * @return a FileInputStream for the given file 1241 * @throws IOException if there is a problem creating the stream 1242 */ 1243 private FileInputStream deleteFileOnClose(final File file) throws IOException { 1244 return new FileInputStream(file) { 1245 private File myFile; 1246 1247 private FileInputStream init(File file) { 1248 myFile = file; 1249 return this; 1250 } 1251 1252 @Override 1253 public void close() throws IOException { 1254 // close() will be called during try-with-resources and it will be 1255 // called by finalizer thread during GC. To avoid double-free resource, 1256 // set myFile to null after the first call. 1257 if (myFile == null) { 1258 return; 1259 } 1260 1261 super.close(); 1262 if (!myFile.delete()) { 1263 throw new IOException("Failed deleting persistence file " + myFile.getAbsolutePath()); 1264 } 1265 myFile = null; 1266 } 1267 }.init(file); 1268 } 1269 1270 private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass) 1271 throws IOException { 1272 if (capacitySize != cacheCapacity) { 1273 throw new IOException("Mismatched cache capacity:" + StringUtils.byteDesc(capacitySize) 1274 + ", expected: " + StringUtils.byteDesc(cacheCapacity)); 1275 } 1276 if (!ioEngine.getClass().getName().equals(ioclass)) { 1277 throw new IOException("Class name for IO engine mismatch: " + ioclass + ", expected:" 1278 + ioEngine.getClass().getName()); 1279 } 1280 if (!backingMap.getClass().getName().equals(mapclass)) { 1281 throw new IOException("Class name for cache map mismatch: " + mapclass + ", expected:" 1282 + backingMap.getClass().getName()); 1283 } 1284 } 1285 1286 private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException { 1287 if (proto.hasChecksum()) { 1288 ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(), 1289 algorithm); 1290 } else { 1291 // if has not checksum, it means the persistence file is old format 1292 LOG.info("Persistent file is old format, it does not support verifying file integrity!"); 1293 } 1294 verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass()); 1295 backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(), 1296 this::createRecycler); 1297 } 1298 1299 /** 1300 * Check whether we tolerate IO error this time. If the duration of IOEngine throwing errors 1301 * exceeds ioErrorsDurationTimeTolerated, we will disable the cache 1302 */ 1303 private void checkIOErrorIsTolerated() { 1304 long now = EnvironmentEdgeManager.currentTime(); 1305 // Do a single read to a local variable to avoid timing issue - HBASE-24454 1306 long ioErrorStartTimeTmp = this.ioErrorStartTime; 1307 if (ioErrorStartTimeTmp > 0) { 1308 if (cacheEnabled && (now - ioErrorStartTimeTmp) > this.ioErrorsTolerationDuration) { 1309 LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration 1310 + "ms, disabling cache, please check your IOEngine"); 1311 disableCache(); 1312 } 1313 } else { 1314 this.ioErrorStartTime = now; 1315 } 1316 } 1317 1318 /** 1319 * Used to shut down the cache -or- turn it off in the case of something broken. 1320 */ 1321 private void disableCache() { 1322 if (!cacheEnabled) return; 1323 cacheEnabled = false; 1324 ioEngine.shutdown(); 1325 this.scheduleThreadPool.shutdown(); 1326 for (int i = 0; i < writerThreads.length; ++i) 1327 writerThreads[i].interrupt(); 1328 this.ramCache.clear(); 1329 if (!ioEngine.isPersistent() || persistencePath == null) { 1330 // If persistent ioengine and a path, we will serialize out the backingMap. 1331 this.backingMap.clear(); 1332 } 1333 } 1334 1335 private void join() throws InterruptedException { 1336 for (int i = 0; i < writerThreads.length; ++i) 1337 writerThreads[i].join(); 1338 } 1339 1340 @Override 1341 public void shutdown() { 1342 disableCache(); 1343 LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() + "; path to write=" 1344 + persistencePath); 1345 if (ioEngine.isPersistent() && persistencePath != null) { 1346 try { 1347 join(); 1348 persistToFile(); 1349 } catch (IOException ex) { 1350 LOG.error("Unable to persist data on exit: " + ex.toString(), ex); 1351 } catch (InterruptedException e) { 1352 LOG.warn("Failed to persist data on exit", e); 1353 } 1354 } 1355 } 1356 1357 @Override 1358 public CacheStats getStats() { 1359 return cacheStats; 1360 } 1361 1362 public BucketAllocator getAllocator() { 1363 return this.bucketAllocator; 1364 } 1365 1366 @Override 1367 public long heapSize() { 1368 return this.heapSize.sum(); 1369 } 1370 1371 @Override 1372 public long size() { 1373 return this.realCacheSize.sum(); 1374 } 1375 1376 @Override 1377 public long getCurrentDataSize() { 1378 return size(); 1379 } 1380 1381 @Override 1382 public long getFreeSize() { 1383 return this.bucketAllocator.getFreeSize(); 1384 } 1385 1386 @Override 1387 public long getBlockCount() { 1388 return this.blockNumber.sum(); 1389 } 1390 1391 @Override 1392 public long getDataBlockCount() { 1393 return getBlockCount(); 1394 } 1395 1396 @Override 1397 public long getCurrentSize() { 1398 return this.bucketAllocator.getUsedSize(); 1399 } 1400 1401 protected String getAlgorithm() { 1402 return algorithm; 1403 } 1404 1405 /** 1406 * Evicts all blocks for a specific HFile. 1407 * <p> 1408 * This is used for evict-on-close to remove all blocks of a specific HFile. 1409 * @return the number of blocks evicted 1410 */ 1411 @Override 1412 public int evictBlocksByHfileName(String hfileName) { 1413 Set<BlockCacheKey> keySet = blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE), 1414 true, new BlockCacheKey(hfileName, Long.MAX_VALUE), true); 1415 1416 int numEvicted = 0; 1417 for (BlockCacheKey key : keySet) { 1418 if (evictBlock(key)) { 1419 ++numEvicted; 1420 } 1421 } 1422 1423 return numEvicted; 1424 } 1425 1426 /** 1427 * Used to group bucket entries into priority buckets. There will be a BucketEntryGroup for each 1428 * priority (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate 1429 * number of elements out of each according to configuration parameters and their relative sizes. 1430 */ 1431 private class BucketEntryGroup { 1432 1433 private CachedEntryQueue queue; 1434 private long totalSize = 0; 1435 private long bucketSize; 1436 1437 public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) { 1438 this.bucketSize = bucketSize; 1439 queue = new CachedEntryQueue(bytesToFree, blockSize); 1440 totalSize = 0; 1441 } 1442 1443 public void add(Map.Entry<BlockCacheKey, BucketEntry> block) { 1444 totalSize += block.getValue().getLength(); 1445 queue.add(block); 1446 } 1447 1448 public long free(long toFree) { 1449 Map.Entry<BlockCacheKey, BucketEntry> entry; 1450 long freedBytes = 0; 1451 // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free 1452 // What to do then? Caching attempt fail? Need some changes in cacheBlock API? 1453 while ((entry = queue.pollLast()) != null) { 1454 BlockCacheKey blockCacheKey = entry.getKey(); 1455 BucketEntry be = entry.getValue(); 1456 if (evictBucketEntryIfNoRpcReferenced(blockCacheKey, be)) { 1457 freedBytes += be.getLength(); 1458 } 1459 if (freedBytes >= toFree) { 1460 return freedBytes; 1461 } 1462 } 1463 return freedBytes; 1464 } 1465 1466 public long overflow() { 1467 return totalSize - bucketSize; 1468 } 1469 1470 public long totalSize() { 1471 return totalSize; 1472 } 1473 } 1474 1475 /** 1476 * Block Entry stored in the memory with key,data and so on 1477 */ 1478 static class RAMQueueEntry { 1479 private final BlockCacheKey key; 1480 private final Cacheable data; 1481 private long accessCounter; 1482 private boolean inMemory; 1483 1484 RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory) { 1485 this.key = bck; 1486 this.data = data; 1487 this.accessCounter = accessCounter; 1488 this.inMemory = inMemory; 1489 } 1490 1491 public Cacheable getData() { 1492 return data; 1493 } 1494 1495 public BlockCacheKey getKey() { 1496 return key; 1497 } 1498 1499 public void access(long accessCounter) { 1500 this.accessCounter = accessCounter; 1501 } 1502 1503 private ByteBuffAllocator getByteBuffAllocator() { 1504 if (data instanceof HFileBlock) { 1505 return ((HFileBlock) data).getByteBuffAllocator(); 1506 } 1507 return ByteBuffAllocator.HEAP; 1508 } 1509 1510 public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc, 1511 final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler, 1512 ByteBuffer metaBuff) throws IOException { 1513 int len = data.getSerializedLength(); 1514 // This cacheable thing can't be serialized 1515 if (len == 0) { 1516 return null; 1517 } 1518 long offset = alloc.allocateBlock(len); 1519 boolean succ = false; 1520 BucketEntry bucketEntry = null; 1521 try { 1522 bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory, createRecycler, 1523 getByteBuffAllocator()); 1524 bucketEntry.setDeserializerReference(data.getDeserializer()); 1525 if (data instanceof HFileBlock) { 1526 // If an instance of HFileBlock, save on some allocations. 1527 HFileBlock block = (HFileBlock) data; 1528 ByteBuff sliceBuf = block.getBufferReadOnly(); 1529 block.getMetaData(metaBuff); 1530 ioEngine.write(sliceBuf, offset); 1531 ioEngine.write(metaBuff, offset + len - metaBuff.limit()); 1532 } else { 1533 // Only used for testing. 1534 ByteBuffer bb = ByteBuffer.allocate(len); 1535 data.serialize(bb, true); 1536 ioEngine.write(bb, offset); 1537 } 1538 succ = true; 1539 } finally { 1540 if (!succ) { 1541 alloc.freeBlock(offset); 1542 } 1543 } 1544 realCacheSize.add(len); 1545 return bucketEntry; 1546 } 1547 } 1548 1549 /** 1550 * Only used in test n 1551 */ 1552 void stopWriterThreads() throws InterruptedException { 1553 for (WriterThread writerThread : writerThreads) { 1554 writerThread.disableWriter(); 1555 writerThread.interrupt(); 1556 writerThread.join(); 1557 } 1558 } 1559 1560 @Override 1561 public Iterator<CachedBlock> iterator() { 1562 // Don't bother with ramcache since stuff is in here only a little while. 1563 final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i = this.backingMap.entrySet().iterator(); 1564 return new Iterator<CachedBlock>() { 1565 private final long now = System.nanoTime(); 1566 1567 @Override 1568 public boolean hasNext() { 1569 return i.hasNext(); 1570 } 1571 1572 @Override 1573 public CachedBlock next() { 1574 final Map.Entry<BlockCacheKey, BucketEntry> e = i.next(); 1575 return new CachedBlock() { 1576 @Override 1577 public String toString() { 1578 return BlockCacheUtil.toString(this, now); 1579 } 1580 1581 @Override 1582 public BlockPriority getBlockPriority() { 1583 return e.getValue().getPriority(); 1584 } 1585 1586 @Override 1587 public BlockType getBlockType() { 1588 // Not held by BucketEntry. Could add it if wanted on BucketEntry creation. 1589 return null; 1590 } 1591 1592 @Override 1593 public long getOffset() { 1594 return e.getKey().getOffset(); 1595 } 1596 1597 @Override 1598 public long getSize() { 1599 return e.getValue().getLength(); 1600 } 1601 1602 @Override 1603 public long getCachedTime() { 1604 return e.getValue().getCachedTime(); 1605 } 1606 1607 @Override 1608 public String getFilename() { 1609 return e.getKey().getHfileName(); 1610 } 1611 1612 @Override 1613 public int compareTo(CachedBlock other) { 1614 int diff = this.getFilename().compareTo(other.getFilename()); 1615 if (diff != 0) return diff; 1616 1617 diff = Long.compare(this.getOffset(), other.getOffset()); 1618 if (diff != 0) return diff; 1619 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) { 1620 throw new IllegalStateException( 1621 "" + this.getCachedTime() + ", " + other.getCachedTime()); 1622 } 1623 return Long.compare(other.getCachedTime(), this.getCachedTime()); 1624 } 1625 1626 @Override 1627 public int hashCode() { 1628 return e.getKey().hashCode(); 1629 } 1630 1631 @Override 1632 public boolean equals(Object obj) { 1633 if (obj instanceof CachedBlock) { 1634 CachedBlock cb = (CachedBlock) obj; 1635 return compareTo(cb) == 0; 1636 } else { 1637 return false; 1638 } 1639 } 1640 }; 1641 } 1642 1643 @Override 1644 public void remove() { 1645 throw new UnsupportedOperationException(); 1646 } 1647 }; 1648 } 1649 1650 @Override 1651 public BlockCache[] getBlockCaches() { 1652 return null; 1653 } 1654 1655 public int getRpcRefCount(BlockCacheKey cacheKey) { 1656 BucketEntry bucketEntry = backingMap.get(cacheKey); 1657 if (bucketEntry != null) { 1658 return bucketEntry.refCnt() - (bucketEntry.markedAsEvicted.get() ? 0 : 1); 1659 } 1660 return 0; 1661 } 1662 1663 float getAcceptableFactor() { 1664 return acceptableFactor; 1665 } 1666 1667 float getMinFactor() { 1668 return minFactor; 1669 } 1670 1671 float getExtraFreeFactor() { 1672 return extraFreeFactor; 1673 } 1674 1675 float getSingleFactor() { 1676 return singleFactor; 1677 } 1678 1679 float getMultiFactor() { 1680 return multiFactor; 1681 } 1682 1683 float getMemoryFactor() { 1684 return memoryFactor; 1685 } 1686 1687 /** 1688 * Wrapped the delegate ConcurrentMap with maintaining its block's reference count. 1689 */ 1690 static class RAMCache { 1691 /** 1692 * Defined the map as {@link ConcurrentHashMap} explicitly here, because in 1693 * {@link RAMCache#get(BlockCacheKey)} and 1694 * {@link RAMCache#putIfAbsent(BlockCacheKey, BucketCache.RAMQueueEntry)} , we need to guarantee 1695 * the atomicity of map#computeIfPresent(key, func) and map#putIfAbsent(key, func). Besides, the 1696 * func method can execute exactly once only when the key is present(or absent) and under the 1697 * lock context. Otherwise, the reference count of block will be messed up. Notice that the 1698 * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that. 1699 */ 1700 final ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> delegate = new ConcurrentHashMap<>(); 1701 1702 public boolean containsKey(BlockCacheKey key) { 1703 return delegate.containsKey(key); 1704 } 1705 1706 public RAMQueueEntry get(BlockCacheKey key) { 1707 return delegate.computeIfPresent(key, (k, re) -> { 1708 // It'll be referenced by RPC, so retain atomically here. if the get and retain is not 1709 // atomic, another thread may remove and release the block, when retaining in this thread we 1710 // may retain a block with refCnt=0 which is disallowed. (see HBASE-22422) 1711 re.getData().retain(); 1712 return re; 1713 }); 1714 } 1715 1716 /** 1717 * Return the previous associated value, or null if absent. It has the same meaning as 1718 * {@link ConcurrentMap#putIfAbsent(Object, Object)} 1719 */ 1720 public RAMQueueEntry putIfAbsent(BlockCacheKey key, RAMQueueEntry entry) { 1721 AtomicBoolean absent = new AtomicBoolean(false); 1722 RAMQueueEntry re = delegate.computeIfAbsent(key, k -> { 1723 // The RAMCache reference to this entry, so reference count should be increment. 1724 entry.getData().retain(); 1725 absent.set(true); 1726 return entry; 1727 }); 1728 return absent.get() ? null : re; 1729 } 1730 1731 public boolean remove(BlockCacheKey key) { 1732 return remove(key, re -> { 1733 }); 1734 } 1735 1736 /** 1737 * Defined an {@link Consumer} here, because once the removed entry release its reference count, 1738 * then it's ByteBuffers may be recycled and accessing it outside this method will be thrown an 1739 * exception. the consumer will access entry to remove before release its reference count. 1740 * Notice, don't change its reference count in the {@link Consumer} 1741 */ 1742 public boolean remove(BlockCacheKey key, Consumer<RAMQueueEntry> action) { 1743 RAMQueueEntry previous = delegate.remove(key); 1744 action.accept(previous); 1745 if (previous != null) { 1746 previous.getData().release(); 1747 } 1748 return previous != null; 1749 } 1750 1751 public boolean isEmpty() { 1752 return delegate.isEmpty(); 1753 } 1754 1755 public void clear() { 1756 Iterator<Map.Entry<BlockCacheKey, RAMQueueEntry>> it = delegate.entrySet().iterator(); 1757 while (it.hasNext()) { 1758 RAMQueueEntry re = it.next().getValue(); 1759 it.remove(); 1760 re.getData().release(); 1761 } 1762 } 1763 } 1764}