001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static java.util.Objects.requireNonNull; 021 022import java.lang.ref.WeakReference; 023import java.util.EnumMap; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Map; 027import java.util.PriorityQueue; 028import java.util.SortedSet; 029import java.util.TreeSet; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.Executors; 032import java.util.concurrent.ScheduledExecutorService; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicLong; 035import java.util.concurrent.atomic.LongAdder; 036import java.util.concurrent.locks.ReentrantLock; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.io.HeapSize; 039import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 040import org.apache.hadoop.hbase.util.ClassSize; 041import org.apache.hadoop.util.StringUtils; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 047import org.apache.hbase.thirdparty.com.google.common.base.Objects; 048import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 049 050/** 051 * A block cache implementation that is memory-aware using {@link HeapSize}, memory-bound using an 052 * LRU eviction algorithm, and concurrent: backed by a {@link ConcurrentHashMap} and with a 053 * non-blocking eviction thread giving constant-time {@link #cacheBlock} and {@link #getBlock} 054 * operations. 055 * </p> 056 * Contains three levels of block priority to allow for scan-resistance and in-memory families 057 * {@link org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder#setInMemory(boolean)} (An 058 * in-memory column family is a column family that should be served from memory if possible): 059 * single-access, multiple-accesses, and in-memory priority. A block is added with an in-memory 060 * priority flag if {@link org.apache.hadoop.hbase.client.ColumnFamilyDescriptor#isInMemory()}, 061 * otherwise a block becomes a single access priority the first time it is read into this block 062 * cache. If a block is accessed again while in cache, it is marked as a multiple access priority 063 * block. This delineation of blocks is used to prevent scans from thrashing the cache adding a 064 * least-frequently-used element to the eviction algorithm. 065 * <p/> 066 * Each priority is given its own chunk of the total cache to ensure fairness during eviction. Each 067 * priority will retain close to its maximum size, however, if any priority is not using its entire 068 * chunk the others are able to grow beyond their chunk size. 069 * <p/> 070 * Instantiated at a minimum with the total size and average block size. All sizes are in bytes. The 071 * block size is not especially important as this cache is fully dynamic in its sizing of blocks. It 072 * is only used for pre-allocating data structures and in initial heap estimation of the map. 073 * <p/> 074 * The detailed constructor defines the sizes for the three priorities (they should total to the 075 * <code>maximum size</code> defined). It also sets the levels that trigger and control the eviction 076 * thread. 077 * <p/> 078 * The <code>acceptable size</code> is the cache size level which triggers the eviction process to 079 * start. It evicts enough blocks to get the size below the minimum size specified. 080 * <p/> 081 * Eviction happens in a separate thread and involves a single full-scan of the map. It determines 082 * how many bytes must be freed to reach the minimum size, and then while scanning determines the 083 * fewest least-recently-used blocks necessary from each of the three priorities (would be 3 times 084 * bytes to free). It then uses the priority chunk sizes to evict fairly according to the relative 085 * sizes and usage. 086 */ 087@InterfaceAudience.Private 088public class LruBlockCache implements FirstLevelBlockCache { 089 090 private static final Logger LOG = LoggerFactory.getLogger(LruBlockCache.class); 091 092 /** 093 * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep 094 * evicting during an eviction run till the cache size is down to 80% of the total. 095 */ 096 private static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor"; 097 098 /** 099 * Acceptable size of cache (no evictions if size < acceptable) 100 */ 101 private static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME = 102 "hbase.lru.blockcache.acceptable.factor"; 103 104 /** 105 * Hard capacity limit of cache, will reject any put if size > this * acceptable 106 */ 107 static final String LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME = 108 "hbase.lru.blockcache.hard.capacity.limit.factor"; 109 private static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME = 110 "hbase.lru.blockcache.single.percentage"; 111 private static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME = 112 "hbase.lru.blockcache.multi.percentage"; 113 private static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME = 114 "hbase.lru.blockcache.memory.percentage"; 115 116 /** 117 * Configuration key to force data-block always (except in-memory are too much) cached in memory 118 * for in-memory hfile, unlike inMemory, which is a column-family configuration, inMemoryForceMode 119 * is a cluster-wide configuration 120 */ 121 private static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME = 122 "hbase.lru.rs.inmemoryforcemode"; 123 124 /* Default Configuration Parameters */ 125 126 /* Backing Concurrent Map Configuration */ 127 static final float DEFAULT_LOAD_FACTOR = 0.75f; 128 static final int DEFAULT_CONCURRENCY_LEVEL = 16; 129 130 /* Eviction thresholds */ 131 private static final float DEFAULT_MIN_FACTOR = 0.95f; 132 static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f; 133 134 /* Priority buckets */ 135 private static final float DEFAULT_SINGLE_FACTOR = 0.25f; 136 private static final float DEFAULT_MULTI_FACTOR = 0.50f; 137 private static final float DEFAULT_MEMORY_FACTOR = 0.25f; 138 139 private static final float DEFAULT_HARD_CAPACITY_LIMIT_FACTOR = 1.2f; 140 141 private static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false; 142 143 /* Statistics thread */ 144 private static final int STAT_THREAD_PERIOD = 60 * 5; 145 private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size"; 146 private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L; 147 148 /** 149 * Defined the cache map as {@link ConcurrentHashMap} here, because in 150 * {@link LruBlockCache#getBlock}, we need to guarantee the atomicity of map#computeIfPresent 151 * (key, func). Besides, the func method must execute exactly once only when the key is present 152 * and under the lock context, otherwise the reference count will be messed up. Notice that the 153 * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that. 154 */ 155 private transient final ConcurrentHashMap<BlockCacheKey, LruCachedBlock> map; 156 157 /** Eviction lock (locked when eviction in process) */ 158 private transient final ReentrantLock evictionLock = new ReentrantLock(true); 159 160 private final long maxBlockSize; 161 162 /** Volatile boolean to track if we are in an eviction process or not */ 163 private volatile boolean evictionInProgress = false; 164 165 /** Eviction thread */ 166 private transient final EvictionThread evictionThread; 167 168 /** Statistics thread schedule pool (for heavy debugging, could remove) */ 169 private transient final ScheduledExecutorService scheduleThreadPool = 170 Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() 171 .setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build()); 172 173 /** Current size of cache */ 174 private final AtomicLong size; 175 176 /** Current size of data blocks */ 177 private final LongAdder dataBlockSize; 178 179 /** Current number of cached elements */ 180 private final AtomicLong elements; 181 182 /** Current number of cached data block elements */ 183 private final LongAdder dataBlockElements; 184 185 /** Cache access count (sequential ID) */ 186 private final AtomicLong count; 187 188 /** hard capacity limit */ 189 private float hardCapacityLimitFactor; 190 191 /** Cache statistics */ 192 private final CacheStats stats; 193 194 /** Maximum allowable size of cache (block put if size > max, evict) */ 195 private long maxSize; 196 197 /** Approximate block size */ 198 private long blockSize; 199 200 /** Acceptable size of cache (no evictions if size < acceptable) */ 201 private float acceptableFactor; 202 203 /** Minimum threshold of cache (when evicting, evict until size < min) */ 204 private float minFactor; 205 206 /** Single access bucket size */ 207 private float singleFactor; 208 209 /** Multiple access bucket size */ 210 private float multiFactor; 211 212 /** In-memory bucket size */ 213 private float memoryFactor; 214 215 /** Overhead of the structure itself */ 216 private long overhead; 217 218 /** Whether in-memory hfile's data block has higher priority when evicting */ 219 private boolean forceInMemory; 220 221 /** 222 * Where to send victims (blocks evicted/missing from the cache). This is used only when we use an 223 * external cache as L2. Note: See org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache 224 */ 225 private transient BlockCache victimHandler = null; 226 227 /** 228 * Default constructor. Specify maximum size and expected average block size (approximation is 229 * fine). 230 * <p> 231 * All other factors will be calculated based on defaults specified in this class. 232 * @param maxSize maximum size of cache, in bytes 233 * @param blockSize approximate size of each block, in bytes 234 */ 235 public LruBlockCache(long maxSize, long blockSize) { 236 this(maxSize, blockSize, true); 237 } 238 239 /** 240 * Constructor used for testing. Allows disabling of the eviction thread. 241 */ 242 public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) { 243 this(maxSize, blockSize, evictionThread, (int) Math.ceil(1.2 * maxSize / blockSize), 244 DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL, DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR, 245 DEFAULT_SINGLE_FACTOR, DEFAULT_MULTI_FACTOR, DEFAULT_MEMORY_FACTOR, 246 DEFAULT_HARD_CAPACITY_LIMIT_FACTOR, false, DEFAULT_MAX_BLOCK_SIZE); 247 } 248 249 public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) { 250 this(maxSize, blockSize, evictionThread, (int) Math.ceil(1.2 * maxSize / blockSize), 251 DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL, 252 conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR), 253 conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR), 254 conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR), 255 conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR), 256 conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR), 257 conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, DEFAULT_HARD_CAPACITY_LIMIT_FACTOR), 258 conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE), 259 conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE)); 260 } 261 262 public LruBlockCache(long maxSize, long blockSize, Configuration conf) { 263 this(maxSize, blockSize, true, conf); 264 } 265 266 /** 267 * Configurable constructor. Use this constructor if not using defaults. 268 * @param maxSize maximum size of this cache, in bytes 269 * @param blockSize expected average size of blocks, in bytes 270 * @param evictionThread whether to run evictions in a bg thread or not 271 * @param mapInitialSize initial size of backing ConcurrentHashMap 272 * @param mapLoadFactor initial load factor of backing ConcurrentHashMap 273 * @param mapConcurrencyLevel initial concurrency factor for backing CHM 274 * @param minFactor percentage of total size that eviction will evict until 275 * @param acceptableFactor percentage of total size that triggers eviction 276 * @param singleFactor percentage of total size for single-access blocks 277 * @param multiFactor percentage of total size for multiple-access blocks 278 * @param memoryFactor percentage of total size for in-memory blocks 279 */ 280 public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, int mapInitialSize, 281 float mapLoadFactor, int mapConcurrencyLevel, float minFactor, float acceptableFactor, 282 float singleFactor, float multiFactor, float memoryFactor, float hardLimitFactor, 283 boolean forceInMemory, long maxBlockSize) { 284 this.maxBlockSize = maxBlockSize; 285 if ( 286 singleFactor + multiFactor + memoryFactor != 1 || singleFactor < 0 || multiFactor < 0 287 || memoryFactor < 0 288 ) { 289 throw new IllegalArgumentException( 290 "Single, multi, and memory factors " + " should be non-negative and total 1.0"); 291 } 292 if (minFactor >= acceptableFactor) { 293 throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor"); 294 } 295 if (minFactor >= 1.0f || acceptableFactor >= 1.0f) { 296 throw new IllegalArgumentException("all factors must be < 1"); 297 } 298 this.maxSize = maxSize; 299 this.blockSize = blockSize; 300 this.forceInMemory = forceInMemory; 301 map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel); 302 this.minFactor = minFactor; 303 this.acceptableFactor = acceptableFactor; 304 this.singleFactor = singleFactor; 305 this.multiFactor = multiFactor; 306 this.memoryFactor = memoryFactor; 307 this.stats = new CacheStats(this.getClass().getSimpleName()); 308 this.count = new AtomicLong(0); 309 this.elements = new AtomicLong(0); 310 this.dataBlockElements = new LongAdder(); 311 this.dataBlockSize = new LongAdder(); 312 this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel); 313 this.size = new AtomicLong(this.overhead); 314 this.hardCapacityLimitFactor = hardLimitFactor; 315 if (evictionThread) { 316 this.evictionThread = new EvictionThread(this); 317 this.evictionThread.start(); // FindBugs SC_START_IN_CTOR 318 } else { 319 this.evictionThread = null; 320 } 321 // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log 322 // every five minutes. 323 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD, 324 STAT_THREAD_PERIOD, TimeUnit.SECONDS); 325 } 326 327 @Override 328 public void setVictimCache(BlockCache victimCache) { 329 if (victimHandler != null) { 330 throw new IllegalArgumentException("The victim cache has already been set"); 331 } 332 victimHandler = requireNonNull(victimCache); 333 } 334 335 @Override 336 public void setMaxSize(long maxSize) { 337 this.maxSize = maxSize; 338 if (this.size.get() > acceptableSize() && !evictionInProgress) { 339 runEviction(); 340 } 341 } 342 343 /** 344 * The block cached in LRUBlockCache will always be an heap block: on the one side, the heap 345 * access will be more faster then off-heap, the small index block or meta block cached in 346 * CombinedBlockCache will benefit a lot. on other side, the LRUBlockCache size is always 347 * calculated based on the total heap size, if caching an off-heap block in LRUBlockCache, the 348 * heap size will be messed up. Here we will clone the block into an heap block if it's an 349 * off-heap block, otherwise just use the original block. The key point is maintain the refCnt of 350 * the block (HBASE-22127): <br> 351 * 1. if cache the cloned heap block, its refCnt is an totally new one, it's easy to handle; <br> 352 * 2. if cache the original heap block, we're sure that it won't be tracked in ByteBuffAllocator's 353 * reservoir, if both RPC and LRUBlockCache release the block, then it can be garbage collected by 354 * JVM, so need a retain here. 355 * @param buf the original block 356 * @return an block with an heap memory backend. 357 */ 358 private Cacheable asReferencedHeapBlock(Cacheable buf) { 359 if (buf instanceof HFileBlock) { 360 HFileBlock blk = ((HFileBlock) buf); 361 if (blk.isSharedMem()) { 362 return HFileBlock.deepCloneOnHeap(blk); 363 } 364 } 365 // The block will be referenced by this LRUBlockCache, so should increase its refCnt here. 366 return buf.retain(); 367 } 368 369 // BlockCache implementation 370 371 /** 372 * Cache the block with the specified name and buffer. 373 * <p> 374 * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547) 375 * this can happen, for which we compare the buffer contents. 376 * @param cacheKey block's cache key 377 * @param buf block buffer 378 * @param inMemory if block is in-memory 379 */ 380 @Override 381 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { 382 if (buf.heapSize() > maxBlockSize) { 383 // If there are a lot of blocks that are too 384 // big this can make the logs way too noisy. 385 // So we log 2% 386 if (stats.failInsert() % 50 == 0) { 387 LOG.warn("Trying to cache too large a block " + cacheKey.getHfileName() + " @ " 388 + cacheKey.getOffset() + " is " + buf.heapSize() + " which is larger than " 389 + maxBlockSize); 390 } 391 return; 392 } 393 394 LruCachedBlock cb = map.get(cacheKey); 395 if (cb != null && !BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, buf)) { 396 return; 397 } 398 long currentSize = size.get(); 399 long currentAcceptableSize = acceptableSize(); 400 long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize); 401 if (currentSize >= hardLimitSize) { 402 stats.failInsert(); 403 if (LOG.isTraceEnabled()) { 404 LOG.trace("LruBlockCache current size " + StringUtils.byteDesc(currentSize) 405 + " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + "." 406 + " The hard limit size is " + StringUtils.byteDesc(hardLimitSize) 407 + ", failed to put cacheKey:" + cacheKey + " into LruBlockCache."); 408 } 409 if (!evictionInProgress) { 410 runEviction(); 411 } 412 return; 413 } 414 // Ensure that the block is an heap one. 415 buf = asReferencedHeapBlock(buf); 416 cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory); 417 long newSize = updateSizeMetrics(cb, false); 418 map.put(cacheKey, cb); 419 long val = elements.incrementAndGet(); 420 if (buf.getBlockType().isData()) { 421 dataBlockElements.increment(); 422 } 423 if (LOG.isTraceEnabled()) { 424 long size = map.size(); 425 assertCounterSanity(size, val); 426 } 427 if (newSize > currentAcceptableSize && !evictionInProgress) { 428 runEviction(); 429 } 430 } 431 432 /** 433 * Sanity-checking for parity between actual block cache content and metrics. Intended only for 434 * use with TRACE level logging and -ea JVM. 435 */ 436 private static void assertCounterSanity(long mapSize, long counterVal) { 437 if (counterVal < 0) { 438 LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal 439 + ", mapSize=" + mapSize); 440 return; 441 } 442 if (mapSize < Integer.MAX_VALUE) { 443 double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.); 444 if (pct_diff > 0.05) { 445 LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal 446 + ", mapSize=" + mapSize); 447 } 448 } 449 } 450 451 /** 452 * Cache the block with the specified name and buffer. 453 * <p> 454 * TODO after HBASE-22005, we may cache an block which allocated from off-heap, but our LRU cache 455 * sizing is based on heap size, so we should handle this in HBASE-22127. It will introduce an 456 * switch whether make the LRU on-heap or not, if so we may need copy the memory to on-heap, 457 * otherwise the caching size is based on off-heap. 458 * @param cacheKey block's cache key 459 * @param buf block buffer 460 */ 461 @Override 462 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 463 cacheBlock(cacheKey, buf, false); 464 } 465 466 /** 467 * Helper function that updates the local size counter and also updates any per-cf or 468 * per-blocktype metrics it can discern from given {@link LruCachedBlock} 469 */ 470 private long updateSizeMetrics(LruCachedBlock cb, boolean evict) { 471 long heapsize = cb.heapSize(); 472 BlockType bt = cb.getBuffer().getBlockType(); 473 if (evict) { 474 heapsize *= -1; 475 } 476 if (bt != null && bt.isData()) { 477 dataBlockSize.add(heapsize); 478 } 479 return size.addAndGet(heapsize); 480 } 481 482 /** 483 * Get the buffer of the block with the specified name. 484 * @param cacheKey block's cache key 485 * @param caching true if the caller caches blocks on cache misses 486 * @param repeat Whether this is a repeat lookup for the same block (used to avoid 487 * double counting cache misses when doing double-check locking) 488 * @param updateCacheMetrics Whether to update cache metrics or not 489 * @return buffer of specified cache key, or null if not in cache 490 */ 491 @Override 492 public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, 493 boolean updateCacheMetrics) { 494 LruCachedBlock cb = map.computeIfPresent(cacheKey, (key, val) -> { 495 // It will be referenced by RPC path, so increase here. NOTICE: Must do the retain inside 496 // this block. because if retain outside the map#computeIfPresent, the evictBlock may remove 497 // the block and release, then we're retaining a block with refCnt=0 which is disallowed. 498 // see HBASE-22422. 499 val.getBuffer().retain(); 500 return val; 501 }); 502 if (cb == null) { 503 if (!repeat && updateCacheMetrics) { 504 stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 505 } 506 // If there is another block cache then try and read there. 507 // However if this is a retry ( second time in double checked locking ) 508 // And it's already a miss then the l2 will also be a miss. 509 if (victimHandler != null && !repeat) { 510 // The handler will increase result's refCnt for RPC, so need no extra retain. 511 Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics); 512 // Promote this to L1. 513 if (result != null) { 514 if (caching) { 515 cacheBlock(cacheKey, result, /* inMemory = */ false); 516 } 517 } 518 return result; 519 } 520 return null; 521 } 522 if (updateCacheMetrics) { 523 stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 524 } 525 cb.access(count.incrementAndGet()); 526 return cb.getBuffer(); 527 } 528 529 /** 530 * Whether the cache contains block with specified cacheKey 531 * @return true if contains the block 532 */ 533 @Override 534 public boolean containsBlock(BlockCacheKey cacheKey) { 535 return map.containsKey(cacheKey); 536 } 537 538 @Override 539 public boolean evictBlock(BlockCacheKey cacheKey) { 540 LruCachedBlock cb = map.get(cacheKey); 541 return cb != null && evictBlock(cb, false) > 0; 542 } 543 544 /** 545 * Evicts all blocks for a specific HFile. This is an expensive operation implemented as a 546 * linear-time search through all blocks in the cache. Ideally this should be a search in a 547 * log-access-time map. 548 * <p> 549 * This is used for evict-on-close to remove all blocks of a specific HFile. 550 * @return the number of blocks evicted 551 */ 552 @Override 553 public int evictBlocksByHfileName(String hfileName) { 554 int numEvicted = 0; 555 for (BlockCacheKey key : map.keySet()) { 556 if (key.getHfileName().equals(hfileName)) { 557 if (evictBlock(key)) { 558 ++numEvicted; 559 } 560 } 561 } 562 if (victimHandler != null) { 563 numEvicted += victimHandler.evictBlocksByHfileName(hfileName); 564 } 565 return numEvicted; 566 } 567 568 /** 569 * Evict the block, and it will be cached by the victim handler if exists && block may be 570 * read again later 571 * @param evictedByEvictionProcess true if the given block is evicted by EvictionThread 572 * @return the heap size of evicted block 573 */ 574 protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) { 575 LruCachedBlock previous = map.remove(block.getCacheKey()); 576 if (previous == null) { 577 return 0; 578 } 579 updateSizeMetrics(block, true); 580 long val = elements.decrementAndGet(); 581 if (LOG.isTraceEnabled()) { 582 long size = map.size(); 583 assertCounterSanity(size, val); 584 } 585 if (block.getBuffer().getBlockType().isData()) { 586 dataBlockElements.decrement(); 587 } 588 if (evictedByEvictionProcess) { 589 // When the eviction of the block happened because of invalidation of HFiles, no need to 590 // update the stats counter. 591 stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary()); 592 if (victimHandler != null) { 593 victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer()); 594 } 595 } 596 // Decrease the block's reference count, and if refCount is 0, then it'll auto-deallocate. DO 597 // NOT move this up because if do that then the victimHandler may access the buffer with 598 // refCnt = 0 which is disallowed. 599 previous.getBuffer().release(); 600 return block.heapSize(); 601 } 602 603 /** 604 * Multi-threaded call to run the eviction process. 605 */ 606 private void runEviction() { 607 if (evictionThread == null || !evictionThread.isGo()) { 608 evict(); 609 } else { 610 evictionThread.evict(); 611 } 612 } 613 614 boolean isEvictionInProgress() { 615 return evictionInProgress; 616 } 617 618 long getOverhead() { 619 return overhead; 620 } 621 622 /** 623 * Eviction method. 624 */ 625 void evict() { 626 627 // Ensure only one eviction at a time 628 if (!evictionLock.tryLock()) { 629 return; 630 } 631 632 try { 633 evictionInProgress = true; 634 long currentSize = this.size.get(); 635 long bytesToFree = currentSize - minSize(); 636 637 if (LOG.isTraceEnabled()) { 638 LOG.trace("Block cache LRU eviction started; Attempting to free " 639 + StringUtils.byteDesc(bytesToFree) + " of total=" + StringUtils.byteDesc(currentSize)); 640 } 641 642 if (bytesToFree <= 0) { 643 return; 644 } 645 646 // Instantiate priority buckets 647 BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize, singleSize()); 648 BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize, multiSize()); 649 BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize, memorySize()); 650 651 // Scan entire map putting into appropriate buckets 652 for (LruCachedBlock cachedBlock : map.values()) { 653 switch (cachedBlock.getPriority()) { 654 case SINGLE: { 655 bucketSingle.add(cachedBlock); 656 break; 657 } 658 case MULTI: { 659 bucketMulti.add(cachedBlock); 660 break; 661 } 662 case MEMORY: { 663 bucketMemory.add(cachedBlock); 664 break; 665 } 666 } 667 } 668 669 long bytesFreed = 0; 670 if (forceInMemory || memoryFactor > 0.999f) { 671 long s = bucketSingle.totalSize(); 672 long m = bucketMulti.totalSize(); 673 if (bytesToFree > (s + m)) { 674 // this means we need to evict blocks in memory bucket to make room, 675 // so the single and multi buckets will be emptied 676 bytesFreed = bucketSingle.free(s); 677 bytesFreed += bucketMulti.free(m); 678 if (LOG.isTraceEnabled()) { 679 LOG.trace( 680 "freed " + StringUtils.byteDesc(bytesFreed) + " from single and multi buckets"); 681 } 682 bytesFreed += bucketMemory.free(bytesToFree - bytesFreed); 683 if (LOG.isTraceEnabled()) { 684 LOG.trace( 685 "freed " + StringUtils.byteDesc(bytesFreed) + " total from all three buckets "); 686 } 687 } else { 688 // this means no need to evict block in memory bucket, 689 // and we try best to make the ratio between single-bucket and 690 // multi-bucket is 1:2 691 long bytesRemain = s + m - bytesToFree; 692 if (3 * s <= bytesRemain) { 693 // single-bucket is small enough that no eviction happens for it 694 // hence all eviction goes from multi-bucket 695 bytesFreed = bucketMulti.free(bytesToFree); 696 } else if (3 * m <= 2 * bytesRemain) { 697 // multi-bucket is small enough that no eviction happens for it 698 // hence all eviction goes from single-bucket 699 bytesFreed = bucketSingle.free(bytesToFree); 700 } else { 701 // both buckets need to evict some blocks 702 bytesFreed = bucketSingle.free(s - bytesRemain / 3); 703 if (bytesFreed < bytesToFree) { 704 bytesFreed += bucketMulti.free(bytesToFree - bytesFreed); 705 } 706 } 707 } 708 } else { 709 PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3); 710 711 bucketQueue.add(bucketSingle); 712 bucketQueue.add(bucketMulti); 713 bucketQueue.add(bucketMemory); 714 715 int remainingBuckets = bucketQueue.size(); 716 717 BlockBucket bucket; 718 while ((bucket = bucketQueue.poll()) != null) { 719 long overflow = bucket.overflow(); 720 if (overflow > 0) { 721 long bucketBytesToFree = 722 Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets); 723 bytesFreed += bucket.free(bucketBytesToFree); 724 } 725 remainingBuckets--; 726 } 727 } 728 if (LOG.isTraceEnabled()) { 729 long single = bucketSingle.totalSize(); 730 long multi = bucketMulti.totalSize(); 731 long memory = bucketMemory.totalSize(); 732 LOG.trace( 733 "Block cache LRU eviction completed; " + "freed=" + StringUtils.byteDesc(bytesFreed) 734 + ", " + "total=" + StringUtils.byteDesc(this.size.get()) + ", " + "single=" 735 + StringUtils.byteDesc(single) + ", " + "multi=" + StringUtils.byteDesc(multi) + ", " 736 + "memory=" + StringUtils.byteDesc(memory)); 737 } 738 } finally { 739 stats.evict(); 740 evictionInProgress = false; 741 evictionLock.unlock(); 742 } 743 } 744 745 @Override 746 public String toString() { 747 return MoreObjects.toStringHelper(this).add("blockCount", getBlockCount()) 748 .add("currentSize", StringUtils.byteDesc(getCurrentSize())) 749 .add("freeSize", StringUtils.byteDesc(getFreeSize())) 750 .add("maxSize", StringUtils.byteDesc(getMaxSize())) 751 .add("heapSize", StringUtils.byteDesc(heapSize())) 752 .add("minSize", StringUtils.byteDesc(minSize())).add("minFactor", minFactor) 753 .add("multiSize", StringUtils.byteDesc(multiSize())).add("multiFactor", multiFactor) 754 .add("singleSize", StringUtils.byteDesc(singleSize())).add("singleFactor", singleFactor) 755 .toString(); 756 } 757 758 /** 759 * Used to group blocks into priority buckets. There will be a BlockBucket for each priority 760 * (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate number of 761 * elements out of each according to configuration parameters and their relatives sizes. 762 */ 763 private class BlockBucket implements Comparable<BlockBucket> { 764 765 private final String name; 766 private LruCachedBlockQueue queue; 767 private long totalSize = 0; 768 private long bucketSize; 769 770 public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) { 771 this.name = name; 772 this.bucketSize = bucketSize; 773 queue = new LruCachedBlockQueue(bytesToFree, blockSize); 774 totalSize = 0; 775 } 776 777 public void add(LruCachedBlock block) { 778 totalSize += block.heapSize(); 779 queue.add(block); 780 } 781 782 public long free(long toFree) { 783 if (LOG.isTraceEnabled()) { 784 LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this); 785 } 786 LruCachedBlock cb; 787 long freedBytes = 0; 788 while ((cb = queue.pollLast()) != null) { 789 freedBytes += evictBlock(cb, true); 790 if (freedBytes >= toFree) { 791 return freedBytes; 792 } 793 } 794 if (LOG.isTraceEnabled()) { 795 LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this); 796 } 797 return freedBytes; 798 } 799 800 public long overflow() { 801 return totalSize - bucketSize; 802 } 803 804 public long totalSize() { 805 return totalSize; 806 } 807 808 @Override 809 public int compareTo(BlockBucket that) { 810 return Long.compare(this.overflow(), that.overflow()); 811 } 812 813 @Override 814 public boolean equals(Object that) { 815 if (that == null || !(that instanceof BlockBucket)) { 816 return false; 817 } 818 return compareTo((BlockBucket) that) == 0; 819 } 820 821 @Override 822 public int hashCode() { 823 return Objects.hashCode(name, bucketSize, queue, totalSize); 824 } 825 826 @Override 827 public String toString() { 828 return MoreObjects.toStringHelper(this).add("name", name) 829 .add("totalSize", StringUtils.byteDesc(totalSize)) 830 .add("bucketSize", StringUtils.byteDesc(bucketSize)).toString(); 831 } 832 } 833 834 /** 835 * Get the maximum size of this cache. 836 * @return max size in bytes 837 */ 838 839 @Override 840 public long getMaxSize() { 841 return this.maxSize; 842 } 843 844 @Override 845 public long getCurrentSize() { 846 return this.size.get(); 847 } 848 849 @Override 850 public long getCurrentDataSize() { 851 return this.dataBlockSize.sum(); 852 } 853 854 @Override 855 public long getFreeSize() { 856 return getMaxSize() - getCurrentSize(); 857 } 858 859 @Override 860 public long size() { 861 return getMaxSize(); 862 } 863 864 @Override 865 public long getBlockCount() { 866 return this.elements.get(); 867 } 868 869 @Override 870 public long getDataBlockCount() { 871 return this.dataBlockElements.sum(); 872 } 873 874 EvictionThread getEvictionThread() { 875 return this.evictionThread; 876 } 877 878 /* 879 * Eviction thread. Sits in waiting state until an eviction is triggered when the cache size grows 880 * above the acceptable level.<p> Thread is triggered into action by {@link 881 * LruBlockCache#runEviction()} 882 */ 883 static class EvictionThread extends Thread { 884 885 private WeakReference<LruBlockCache> cache; 886 private volatile boolean go = true; 887 // flag set after enter the run method, used for test 888 private boolean enteringRun = false; 889 890 public EvictionThread(LruBlockCache cache) { 891 super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread"); 892 setDaemon(true); 893 this.cache = new WeakReference<>(cache); 894 } 895 896 @Override 897 public void run() { 898 enteringRun = true; 899 while (this.go) { 900 synchronized (this) { 901 try { 902 this.wait(1000 * 10/* Don't wait for ever */); 903 } catch (InterruptedException e) { 904 LOG.warn("Interrupted eviction thread ", e); 905 Thread.currentThread().interrupt(); 906 } 907 } 908 LruBlockCache cache = this.cache.get(); 909 if (cache == null) { 910 this.go = false; 911 break; 912 } 913 cache.evict(); 914 } 915 } 916 917 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY", 918 justification = "This is what we want") 919 public void evict() { 920 synchronized (this) { 921 this.notifyAll(); 922 } 923 } 924 925 synchronized void shutdown() { 926 this.go = false; 927 this.notifyAll(); 928 } 929 930 public boolean isGo() { 931 return go; 932 } 933 934 /** 935 * Used for the test. 936 */ 937 boolean isEnteringRun() { 938 return this.enteringRun; 939 } 940 } 941 942 /* 943 * Statistics thread. Periodically prints the cache statistics to the log. 944 */ 945 static class StatisticsThread extends Thread { 946 947 private final LruBlockCache lru; 948 949 public StatisticsThread(LruBlockCache lru) { 950 super("LruBlockCacheStats"); 951 setDaemon(true); 952 this.lru = lru; 953 } 954 955 @Override 956 public void run() { 957 lru.logStats(); 958 } 959 } 960 961 public void logStats() { 962 // Log size 963 long totalSize = heapSize(); 964 long freeSize = maxSize - totalSize; 965 LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " + "freeSize=" 966 + StringUtils.byteDesc(freeSize) + ", " + "max=" + StringUtils.byteDesc(this.maxSize) + ", " 967 + "blockCount=" + getBlockCount() + ", " + "accesses=" + stats.getRequestCount() + ", " 968 + "hits=" + stats.getHitCount() + ", " + "hitRatio=" 969 + (stats.getHitCount() == 0 970 ? "0" 971 : (StringUtils.formatPercent(stats.getHitRatio(), 2) + ", ")) 972 + ", " + "cachingAccesses=" + stats.getRequestCachingCount() + ", " + "cachingHits=" 973 + stats.getHitCachingCount() + ", " + "cachingHitsRatio=" 974 + (stats.getHitCachingCount() == 0 975 ? "0," 976 : (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) 977 + "evictions=" + stats.getEvictionCount() + ", " + "evicted=" + stats.getEvictedCount() + ", " 978 + "evictedPerRun=" + stats.evictedPerEviction()); 979 } 980 981 /** 982 * Get counter statistics for this cache. 983 * <p> 984 * Includes: total accesses, hits, misses, evicted blocks, and runs of the eviction processes. 985 */ 986 @Override 987 public CacheStats getStats() { 988 return this.stats; 989 } 990 991 public final static long CACHE_FIXED_OVERHEAD = 992 ClassSize.estimateBase(LruBlockCache.class, false); 993 994 @Override 995 public long heapSize() { 996 return getCurrentSize(); 997 } 998 999 private static long calculateOverhead(long maxSize, long blockSize, int concurrency) { 1000 // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG 1001 return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP 1002 + ((long) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY) 1003 + ((long) concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT); 1004 } 1005 1006 @Override 1007 public Iterator<CachedBlock> iterator() { 1008 final Iterator<LruCachedBlock> iterator = map.values().iterator(); 1009 1010 return new Iterator<CachedBlock>() { 1011 private final long now = System.nanoTime(); 1012 1013 @Override 1014 public boolean hasNext() { 1015 return iterator.hasNext(); 1016 } 1017 1018 @Override 1019 public CachedBlock next() { 1020 final LruCachedBlock b = iterator.next(); 1021 return new CachedBlock() { 1022 @Override 1023 public String toString() { 1024 return BlockCacheUtil.toString(this, now); 1025 } 1026 1027 @Override 1028 public BlockPriority getBlockPriority() { 1029 return b.getPriority(); 1030 } 1031 1032 @Override 1033 public BlockType getBlockType() { 1034 return b.getBuffer().getBlockType(); 1035 } 1036 1037 @Override 1038 public long getOffset() { 1039 return b.getCacheKey().getOffset(); 1040 } 1041 1042 @Override 1043 public long getSize() { 1044 return b.getBuffer().heapSize(); 1045 } 1046 1047 @Override 1048 public long getCachedTime() { 1049 return b.getCachedTime(); 1050 } 1051 1052 @Override 1053 public String getFilename() { 1054 return b.getCacheKey().getHfileName(); 1055 } 1056 1057 @Override 1058 public int compareTo(CachedBlock other) { 1059 int diff = this.getFilename().compareTo(other.getFilename()); 1060 if (diff != 0) { 1061 return diff; 1062 } 1063 diff = Long.compare(this.getOffset(), other.getOffset()); 1064 if (diff != 0) { 1065 return diff; 1066 } 1067 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) { 1068 throw new IllegalStateException(this.getCachedTime() + ", " + other.getCachedTime()); 1069 } 1070 return Long.compare(other.getCachedTime(), this.getCachedTime()); 1071 } 1072 1073 @Override 1074 public int hashCode() { 1075 return b.hashCode(); 1076 } 1077 1078 @Override 1079 public boolean equals(Object obj) { 1080 if (obj instanceof CachedBlock) { 1081 CachedBlock cb = (CachedBlock) obj; 1082 return compareTo(cb) == 0; 1083 } else { 1084 return false; 1085 } 1086 } 1087 }; 1088 } 1089 1090 @Override 1091 public void remove() { 1092 throw new UnsupportedOperationException(); 1093 } 1094 }; 1095 } 1096 1097 // Simple calculators of sizes given factors and maxSize 1098 1099 long acceptableSize() { 1100 return (long) Math.floor(this.maxSize * this.acceptableFactor); 1101 } 1102 1103 private long minSize() { 1104 return (long) Math.floor(this.maxSize * this.minFactor); 1105 } 1106 1107 private long singleSize() { 1108 return (long) Math.floor(this.maxSize * this.singleFactor * this.minFactor); 1109 } 1110 1111 private long multiSize() { 1112 return (long) Math.floor(this.maxSize * this.multiFactor * this.minFactor); 1113 } 1114 1115 private long memorySize() { 1116 return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor); 1117 } 1118 1119 @Override 1120 public void shutdown() { 1121 if (victimHandler != null) { 1122 victimHandler.shutdown(); 1123 } 1124 this.scheduleThreadPool.shutdown(); 1125 for (int i = 0; i < 10; i++) { 1126 if (!this.scheduleThreadPool.isShutdown()) { 1127 try { 1128 Thread.sleep(10); 1129 } catch (InterruptedException e) { 1130 LOG.warn("Interrupted while sleeping"); 1131 Thread.currentThread().interrupt(); 1132 break; 1133 } 1134 } 1135 } 1136 1137 if (!this.scheduleThreadPool.isShutdown()) { 1138 List<Runnable> runnables = this.scheduleThreadPool.shutdownNow(); 1139 LOG.debug("Still running " + runnables); 1140 } 1141 this.evictionThread.shutdown(); 1142 } 1143 1144 /** Clears the cache. Used in tests. */ 1145 public void clearCache() { 1146 this.map.clear(); 1147 this.elements.set(0); 1148 } 1149 1150 /** 1151 * Used in testing. May be very inefficient. 1152 * @return the set of cached file names 1153 */ 1154 SortedSet<String> getCachedFileNamesForTest() { 1155 SortedSet<String> fileNames = new TreeSet<>(); 1156 for (BlockCacheKey cacheKey : map.keySet()) { 1157 fileNames.add(cacheKey.getHfileName()); 1158 } 1159 return fileNames; 1160 } 1161 1162 public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() { 1163 Map<DataBlockEncoding, Integer> counts = new EnumMap<>(DataBlockEncoding.class); 1164 for (LruCachedBlock block : map.values()) { 1165 DataBlockEncoding encoding = ((HFileBlock) block.getBuffer()).getDataBlockEncoding(); 1166 Integer count = counts.get(encoding); 1167 counts.put(encoding, (count == null ? 0 : count) + 1); 1168 } 1169 return counts; 1170 } 1171 1172 Map<BlockCacheKey, LruCachedBlock> getMapForTests() { 1173 return map; 1174 } 1175 1176 @Override 1177 public BlockCache[] getBlockCaches() { 1178 if (victimHandler != null) { 1179 return new BlockCache[] { this, this.victimHandler }; 1180 } 1181 return null; 1182 } 1183}