001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static java.util.Objects.requireNonNull; 021 022import java.lang.ref.WeakReference; 023import java.util.EnumMap; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Map; 027import java.util.PriorityQueue; 028import java.util.SortedSet; 029import java.util.TreeSet; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.Executors; 032import java.util.concurrent.ScheduledExecutorService; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicLong; 035import java.util.concurrent.atomic.LongAdder; 036import java.util.concurrent.locks.ReentrantLock; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.io.HeapSize; 039import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 040import org.apache.hadoop.hbase.util.ClassSize; 041import org.apache.hadoop.util.StringUtils; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 047import org.apache.hbase.thirdparty.com.google.common.base.Objects; 048import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 049 050/** 051 * A block cache implementation that is memory-aware using {@link HeapSize}, memory-bound using an 052 * LRU eviction algorithm, and concurrent: backed by a {@link ConcurrentHashMap} and with a 053 * non-blocking eviction thread giving constant-time {@link #cacheBlock} and {@link #getBlock} 054 * operations. 055 * <p> 056 * Contains three levels of block priority to allow for scan-resistance and in-memory families 057 * {@link org.apache.hadoop.hbase.HColumnDescriptor#setInMemory(boolean)} (An in-memory column 058 * family is a column family that should be served from memory if possible): single-access, 059 * multiple-accesses, and in-memory priority. A block is added with an in-memory priority flag if 060 * {@link org.apache.hadoop.hbase.HColumnDescriptor#isInMemory()}, otherwise a block becomes a 061 * single access priority the first time it is read into this block cache. If a block is accessed 062 * again while in cache, it is marked as a multiple access priority block. This delineation of 063 * blocks is used to prevent scans from thrashing the cache adding a least-frequently-used element 064 * to the eviction algorithm. 065 * <p> 066 * Each priority is given its own chunk of the total cache to ensure fairness during eviction. Each 067 * priority will retain close to its maximum size, however, if any priority is not using its entire 068 * chunk the others are able to grow beyond their chunk size. 069 * <p> 070 * Instantiated at a minimum with the total size and average block size. All sizes are in bytes. The 071 * block size is not especially important as this cache is fully dynamic in its sizing of blocks. It 072 * is only used for pre-allocating data structures and in initial heap estimation of the map. 073 * <p> 074 * The detailed constructor defines the sizes for the three priorities (they should total to the 075 * <code>maximum size</code> defined). It also sets the levels that trigger and control the eviction 076 * thread. 077 * <p> 078 * The <code>acceptable size</code> is the cache size level which triggers the eviction process to 079 * start. It evicts enough blocks to get the size below the minimum size specified. 080 * <p> 081 * Eviction happens in a separate thread and involves a single full-scan of the map. It determines 082 * how many bytes must be freed to reach the minimum size, and then while scanning determines the 083 * fewest least-recently-used blocks necessary from each of the three priorities (would be 3 times 084 * bytes to free). It then uses the priority chunk sizes to evict fairly according to the relative 085 * sizes and usage. 086 */ 087@InterfaceAudience.Private 088public class LruBlockCache implements FirstLevelBlockCache { 089 090 private static final Logger LOG = LoggerFactory.getLogger(LruBlockCache.class); 091 092 /** 093 * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep 094 * evicting during an eviction run till the cache size is down to 80% of the total. 095 */ 096 private static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor"; 097 098 /** 099 * Acceptable size of cache (no evictions if size < acceptable) 100 */ 101 private static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME = 102 "hbase.lru.blockcache.acceptable.factor"; 103 104 /** 105 * Hard capacity limit of cache, will reject any put if size > this * acceptable 106 */ 107 static final String LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME = 108 "hbase.lru.blockcache.hard.capacity.limit.factor"; 109 private static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME = 110 "hbase.lru.blockcache.single.percentage"; 111 private static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME = 112 "hbase.lru.blockcache.multi.percentage"; 113 private static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME = 114 "hbase.lru.blockcache.memory.percentage"; 115 116 /** 117 * Configuration key to force data-block always (except in-memory are too much) cached in memory 118 * for in-memory hfile, unlike inMemory, which is a column-family configuration, inMemoryForceMode 119 * is a cluster-wide configuration 120 */ 121 private static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME = 122 "hbase.lru.rs.inmemoryforcemode"; 123 124 /* Default Configuration Parameters */ 125 126 /* Backing Concurrent Map Configuration */ 127 static final float DEFAULT_LOAD_FACTOR = 0.75f; 128 static final int DEFAULT_CONCURRENCY_LEVEL = 16; 129 130 /* Eviction thresholds */ 131 private static final float DEFAULT_MIN_FACTOR = 0.95f; 132 static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f; 133 134 /* Priority buckets */ 135 private static final float DEFAULT_SINGLE_FACTOR = 0.25f; 136 private static final float DEFAULT_MULTI_FACTOR = 0.50f; 137 private static final float DEFAULT_MEMORY_FACTOR = 0.25f; 138 139 private static final float DEFAULT_HARD_CAPACITY_LIMIT_FACTOR = 1.2f; 140 141 private static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false; 142 143 /* Statistics thread */ 144 private static final int STAT_THREAD_PERIOD = 60 * 5; 145 private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size"; 146 private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L; 147 148 /** 149 * Defined the cache map as {@link ConcurrentHashMap} here, because in 150 * {@link LruBlockCache#getBlock}, we need to guarantee the atomicity of map#computeIfPresent 151 * (key, func). Besides, the func method must execute exactly once only when the key is present 152 * and under the lock context, otherwise the reference count will be messed up. Notice that the 153 * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that. 154 */ 155 private transient final ConcurrentHashMap<BlockCacheKey, LruCachedBlock> map; 156 157 /** Eviction lock (locked when eviction in process) */ 158 private transient final ReentrantLock evictionLock = new ReentrantLock(true); 159 160 private final long maxBlockSize; 161 162 /** Volatile boolean to track if we are in an eviction process or not */ 163 private volatile boolean evictionInProgress = false; 164 165 /** Eviction thread */ 166 private transient final EvictionThread evictionThread; 167 168 /** Statistics thread schedule pool (for heavy debugging, could remove) */ 169 private transient final ScheduledExecutorService scheduleThreadPool = 170 Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() 171 .setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build()); 172 173 /** Current size of cache */ 174 private final AtomicLong size; 175 176 /** Current size of data blocks */ 177 private final LongAdder dataBlockSize = new LongAdder(); 178 179 /** Current size of index blocks */ 180 private final LongAdder indexBlockSize = new LongAdder(); 181 182 /** Current size of bloom blocks */ 183 private final LongAdder bloomBlockSize = new LongAdder(); 184 185 /** Current number of cached elements */ 186 private final AtomicLong elements; 187 188 /** Current number of cached data block elements */ 189 private final LongAdder dataBlockElements = new LongAdder(); 190 191 /** Current number of cached index block elements */ 192 private final LongAdder indexBlockElements = new LongAdder(); 193 194 /** Current number of cached bloom block elements */ 195 private final LongAdder bloomBlockElements = new LongAdder(); 196 197 /** Cache access count (sequential ID) */ 198 private final AtomicLong count; 199 200 /** hard capacity limit */ 201 private float hardCapacityLimitFactor; 202 203 /** Cache statistics */ 204 private final CacheStats stats; 205 206 /** Maximum allowable size of cache (block put if size > max, evict) */ 207 private long maxSize; 208 209 /** Approximate block size */ 210 private long blockSize; 211 212 /** Acceptable size of cache (no evictions if size < acceptable) */ 213 private float acceptableFactor; 214 215 /** Minimum threshold of cache (when evicting, evict until size < min) */ 216 private float minFactor; 217 218 /** Single access bucket size */ 219 private float singleFactor; 220 221 /** Multiple access bucket size */ 222 private float multiFactor; 223 224 /** In-memory bucket size */ 225 private float memoryFactor; 226 227 /** Overhead of the structure itself */ 228 private long overhead; 229 230 /** Whether in-memory hfile's data block has higher priority when evicting */ 231 private boolean forceInMemory; 232 233 /** 234 * Where to send victims (blocks evicted/missing from the cache). This is used only when we use an 235 * external cache as L2. Note: See org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache 236 */ 237 private transient BlockCache victimHandler = null; 238 239 /** 240 * Default constructor. Specify maximum size and expected average block size (approximation is 241 * fine). 242 * <p> 243 * All other factors will be calculated based on defaults specified in this class. 244 * @param maxSize maximum size of cache, in bytes 245 * @param blockSize approximate size of each block, in bytes 246 */ 247 public LruBlockCache(long maxSize, long blockSize) { 248 this(maxSize, blockSize, true); 249 } 250 251 /** 252 * Constructor used for testing. Allows disabling of the eviction thread. 253 */ 254 public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) { 255 this(maxSize, blockSize, evictionThread, (int) Math.ceil(1.2 * maxSize / blockSize), 256 DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL, DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR, 257 DEFAULT_SINGLE_FACTOR, DEFAULT_MULTI_FACTOR, DEFAULT_MEMORY_FACTOR, 258 DEFAULT_HARD_CAPACITY_LIMIT_FACTOR, false, DEFAULT_MAX_BLOCK_SIZE); 259 } 260 261 public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) { 262 this(maxSize, blockSize, evictionThread, (int) Math.ceil(1.2 * maxSize / blockSize), 263 DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL, 264 conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR), 265 conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR), 266 conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR), 267 conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR), 268 conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR), 269 conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, DEFAULT_HARD_CAPACITY_LIMIT_FACTOR), 270 conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE), 271 conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE)); 272 } 273 274 public LruBlockCache(long maxSize, long blockSize, Configuration conf) { 275 this(maxSize, blockSize, true, conf); 276 } 277 278 /** 279 * Configurable constructor. Use this constructor if not using defaults. 280 * @param maxSize maximum size of this cache, in bytes 281 * @param blockSize expected average size of blocks, in bytes 282 * @param evictionThread whether to run evictions in a bg thread or not 283 * @param mapInitialSize initial size of backing ConcurrentHashMap 284 * @param mapLoadFactor initial load factor of backing ConcurrentHashMap 285 * @param mapConcurrencyLevel initial concurrency factor for backing CHM 286 * @param minFactor percentage of total size that eviction will evict until 287 * @param acceptableFactor percentage of total size that triggers eviction 288 * @param singleFactor percentage of total size for single-access blocks 289 * @param multiFactor percentage of total size for multiple-access blocks 290 * @param memoryFactor percentage of total size for in-memory blocks 291 */ 292 public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, int mapInitialSize, 293 float mapLoadFactor, int mapConcurrencyLevel, float minFactor, float acceptableFactor, 294 float singleFactor, float multiFactor, float memoryFactor, float hardLimitFactor, 295 boolean forceInMemory, long maxBlockSize) { 296 this.maxBlockSize = maxBlockSize; 297 if ( 298 singleFactor + multiFactor + memoryFactor != 1 || singleFactor < 0 || multiFactor < 0 299 || memoryFactor < 0 300 ) { 301 throw new IllegalArgumentException( 302 "Single, multi, and memory factors " + " should be non-negative and total 1.0"); 303 } 304 if (minFactor >= acceptableFactor) { 305 throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor"); 306 } 307 if (minFactor >= 1.0f || acceptableFactor >= 1.0f) { 308 throw new IllegalArgumentException("all factors must be < 1"); 309 } 310 this.maxSize = maxSize; 311 this.blockSize = blockSize; 312 this.forceInMemory = forceInMemory; 313 map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel); 314 this.minFactor = minFactor; 315 this.acceptableFactor = acceptableFactor; 316 this.singleFactor = singleFactor; 317 this.multiFactor = multiFactor; 318 this.memoryFactor = memoryFactor; 319 this.stats = new CacheStats(this.getClass().getSimpleName()); 320 this.count = new AtomicLong(0); 321 this.elements = new AtomicLong(0); 322 this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel); 323 this.size = new AtomicLong(this.overhead); 324 this.hardCapacityLimitFactor = hardLimitFactor; 325 if (evictionThread) { 326 this.evictionThread = new EvictionThread(this); 327 this.evictionThread.start(); // FindBugs SC_START_IN_CTOR 328 } else { 329 this.evictionThread = null; 330 } 331 // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log 332 // every five minutes. 333 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD, 334 STAT_THREAD_PERIOD, TimeUnit.SECONDS); 335 } 336 337 @Override 338 public void setVictimCache(BlockCache victimCache) { 339 if (victimHandler != null) { 340 throw new IllegalArgumentException("The victim cache has already been set"); 341 } 342 victimHandler = requireNonNull(victimCache); 343 } 344 345 @Override 346 public void setMaxSize(long maxSize) { 347 this.maxSize = maxSize; 348 if (this.size.get() > acceptableSize() && !evictionInProgress) { 349 runEviction(); 350 } 351 } 352 353 /** 354 * The block cached in LRUBlockCache will always be an heap block: on the one side, the heap 355 * access will be more faster then off-heap, the small index block or meta block cached in 356 * CombinedBlockCache will benefit a lot. on other side, the LRUBlockCache size is always 357 * calculated based on the total heap size, if caching an off-heap block in LRUBlockCache, the 358 * heap size will be messed up. Here we will clone the block into an heap block if it's an 359 * off-heap block, otherwise just use the original block. The key point is maintain the refCnt of 360 * the block (HBASE-22127): <br> 361 * 1. if cache the cloned heap block, its refCnt is an totally new one, it's easy to handle; <br> 362 * 2. if cache the original heap block, we're sure that it won't be tracked in ByteBuffAllocator's 363 * reservoir, if both RPC and LRUBlockCache release the block, then it can be garbage collected by 364 * JVM, so need a retain here. 365 * @param buf the original block 366 * @return an block with an heap memory backend. 367 */ 368 private Cacheable asReferencedHeapBlock(Cacheable buf) { 369 if (buf instanceof HFileBlock) { 370 HFileBlock blk = ((HFileBlock) buf); 371 if (blk.isSharedMem()) { 372 return HFileBlock.deepCloneOnHeap(blk); 373 } 374 } 375 // The block will be referenced by this LRUBlockCache, so should increase its refCnt here. 376 return buf.retain(); 377 } 378 379 // BlockCache implementation 380 381 /** 382 * Cache the block with the specified name and buffer. 383 * <p> 384 * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547) 385 * this can happen, for which we compare the buffer contents. 386 * @param cacheKey block's cache key 387 * @param buf block buffer 388 * @param inMemory if block is in-memory 389 */ 390 @Override 391 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { 392 if (buf.heapSize() > maxBlockSize) { 393 // If there are a lot of blocks that are too 394 // big this can make the logs way too noisy. 395 // So we log 2% 396 if (stats.failInsert() % 50 == 0) { 397 LOG.warn("Trying to cache too large a block " + cacheKey.getHfileName() + " @ " 398 + cacheKey.getOffset() + " is " + buf.heapSize() + " which is larger than " 399 + maxBlockSize); 400 } 401 return; 402 } 403 404 LruCachedBlock cb = map.get(cacheKey); 405 if (cb != null && !BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, buf)) { 406 return; 407 } 408 long currentSize = size.get(); 409 long currentAcceptableSize = acceptableSize(); 410 long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize); 411 if (currentSize >= hardLimitSize) { 412 stats.failInsert(); 413 if (LOG.isTraceEnabled()) { 414 LOG.trace("LruBlockCache current size " + StringUtils.byteDesc(currentSize) 415 + " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + "." 416 + " The hard limit size is " + StringUtils.byteDesc(hardLimitSize) 417 + ", failed to put cacheKey:" + cacheKey + " into LruBlockCache."); 418 } 419 if (!evictionInProgress) { 420 runEviction(); 421 } 422 return; 423 } 424 // Ensure that the block is an heap one. 425 buf = asReferencedHeapBlock(buf); 426 cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory); 427 long newSize = updateSizeMetrics(cb, false); 428 map.put(cacheKey, cb); 429 long val = elements.incrementAndGet(); 430 if (buf.getBlockType().isBloom()) { 431 bloomBlockElements.increment(); 432 } else if (buf.getBlockType().isIndex()) { 433 indexBlockElements.increment(); 434 } else if (buf.getBlockType().isData()) { 435 dataBlockElements.increment(); 436 } 437 if (LOG.isTraceEnabled()) { 438 long size = map.size(); 439 assertCounterSanity(size, val); 440 } 441 if (newSize > currentAcceptableSize && !evictionInProgress) { 442 runEviction(); 443 } 444 } 445 446 /** 447 * Sanity-checking for parity between actual block cache content and metrics. Intended only for 448 * use with TRACE level logging and -ea JVM. 449 */ 450 private static void assertCounterSanity(long mapSize, long counterVal) { 451 if (counterVal < 0) { 452 LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal 453 + ", mapSize=" + mapSize); 454 return; 455 } 456 if (mapSize < Integer.MAX_VALUE) { 457 double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.); 458 if (pct_diff > 0.05) { 459 LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal 460 + ", mapSize=" + mapSize); 461 } 462 } 463 } 464 465 /** 466 * Cache the block with the specified name and buffer. 467 * <p> 468 * TODO after HBASE-22005, we may cache an block which allocated from off-heap, but our LRU cache 469 * sizing is based on heap size, so we should handle this in HBASE-22127. It will introduce an 470 * switch whether make the LRU on-heap or not, if so we may need copy the memory to on-heap, 471 * otherwise the caching size is based on off-heap. 472 * @param cacheKey block's cache key 473 * @param buf block buffer 474 */ 475 @Override 476 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 477 cacheBlock(cacheKey, buf, false); 478 } 479 480 /** 481 * Helper function that updates the local size counter and also updates any per-cf or 482 * per-blocktype metrics it can discern from given {@link LruCachedBlock} 483 */ 484 private long updateSizeMetrics(LruCachedBlock cb, boolean evict) { 485 long heapsize = cb.heapSize(); 486 BlockType bt = cb.getBuffer().getBlockType(); 487 if (evict) { 488 heapsize *= -1; 489 } 490 if (bt != null) { 491 if (bt.isBloom()) { 492 bloomBlockSize.add(heapsize); 493 } else if (bt.isIndex()) { 494 indexBlockSize.add(heapsize); 495 } else if (bt.isData()) { 496 dataBlockSize.add(heapsize); 497 } 498 } 499 return size.addAndGet(heapsize); 500 } 501 502 /** 503 * Get the buffer of the block with the specified name. 504 * @param cacheKey block's cache key 505 * @param caching true if the caller caches blocks on cache misses 506 * @param repeat Whether this is a repeat lookup for the same block (used to avoid 507 * double counting cache misses when doing double-check locking) 508 * @param updateCacheMetrics Whether to update cache metrics or not 509 * @return buffer of specified cache key, or null if not in cache 510 */ 511 @Override 512 public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, 513 boolean updateCacheMetrics) { 514 LruCachedBlock cb = map.computeIfPresent(cacheKey, (key, val) -> { 515 // It will be referenced by RPC path, so increase here. NOTICE: Must do the retain inside 516 // this block. because if retain outside the map#computeIfPresent, the evictBlock may remove 517 // the block and release, then we're retaining a block with refCnt=0 which is disallowed. 518 // see HBASE-22422. 519 val.getBuffer().retain(); 520 return val; 521 }); 522 if (cb == null) { 523 if (!repeat && updateCacheMetrics) { 524 stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 525 } 526 // If there is another block cache then try and read there. 527 // However if this is a retry ( second time in double checked locking ) 528 // And it's already a miss then the l2 will also be a miss. 529 if (victimHandler != null && !repeat) { 530 // The handler will increase result's refCnt for RPC, so need no extra retain. 531 Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics); 532 // Promote this to L1. 533 if (result != null) { 534 if (caching) { 535 cacheBlock(cacheKey, result, /* inMemory = */ false); 536 } 537 } 538 return result; 539 } 540 return null; 541 } 542 if (updateCacheMetrics) { 543 stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 544 } 545 cb.access(count.incrementAndGet()); 546 return cb.getBuffer(); 547 } 548 549 /** 550 * Whether the cache contains block with specified cacheKey 551 * @return true if contains the block 552 */ 553 @Override 554 public boolean containsBlock(BlockCacheKey cacheKey) { 555 return map.containsKey(cacheKey); 556 } 557 558 @Override 559 public boolean evictBlock(BlockCacheKey cacheKey) { 560 LruCachedBlock cb = map.get(cacheKey); 561 return cb != null && evictBlock(cb, false) > 0; 562 } 563 564 /** 565 * Evicts all blocks for a specific HFile. This is an expensive operation implemented as a 566 * linear-time search through all blocks in the cache. Ideally this should be a search in a 567 * log-access-time map. 568 * <p> 569 * This is used for evict-on-close to remove all blocks of a specific HFile. 570 * @return the number of blocks evicted 571 */ 572 @Override 573 public int evictBlocksByHfileName(String hfileName) { 574 int numEvicted = 0; 575 for (BlockCacheKey key : map.keySet()) { 576 if (key.getHfileName().equals(hfileName)) { 577 if (evictBlock(key)) { 578 ++numEvicted; 579 } 580 } 581 } 582 if (victimHandler != null) { 583 numEvicted += victimHandler.evictBlocksByHfileName(hfileName); 584 } 585 return numEvicted; 586 } 587 588 /** 589 * Evict the block, and it will be cached by the victim handler if exists && block may be 590 * read again later 591 * @param evictedByEvictionProcess true if the given block is evicted by EvictionThread 592 * @return the heap size of evicted block 593 */ 594 protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) { 595 LruCachedBlock previous = map.remove(block.getCacheKey()); 596 if (previous == null) { 597 return 0; 598 } 599 updateSizeMetrics(block, true); 600 long val = elements.decrementAndGet(); 601 if (LOG.isTraceEnabled()) { 602 long size = map.size(); 603 assertCounterSanity(size, val); 604 } 605 BlockType bt = block.getBuffer().getBlockType(); 606 if (bt.isBloom()) { 607 bloomBlockElements.decrement(); 608 } else if (bt.isIndex()) { 609 indexBlockElements.decrement(); 610 } else if (bt.isData()) { 611 dataBlockElements.decrement(); 612 } 613 if (evictedByEvictionProcess) { 614 // When the eviction of the block happened because of invalidation of HFiles, no need to 615 // update the stats counter. 616 stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary()); 617 if (victimHandler != null) { 618 victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer()); 619 } 620 } 621 // Decrease the block's reference count, and if refCount is 0, then it'll auto-deallocate. DO 622 // NOT move this up because if do that then the victimHandler may access the buffer with 623 // refCnt = 0 which is disallowed. 624 previous.getBuffer().release(); 625 return block.heapSize(); 626 } 627 628 /** 629 * Multi-threaded call to run the eviction process. 630 */ 631 private void runEviction() { 632 if (evictionThread == null || !evictionThread.isGo()) { 633 evict(); 634 } else { 635 evictionThread.evict(); 636 } 637 } 638 639 boolean isEvictionInProgress() { 640 return evictionInProgress; 641 } 642 643 long getOverhead() { 644 return overhead; 645 } 646 647 /** 648 * Eviction method. 649 */ 650 void evict() { 651 652 // Ensure only one eviction at a time 653 if (!evictionLock.tryLock()) { 654 return; 655 } 656 657 try { 658 evictionInProgress = true; 659 long currentSize = this.size.get(); 660 long bytesToFree = currentSize - minSize(); 661 662 if (LOG.isTraceEnabled()) { 663 LOG.trace("Block cache LRU eviction started; Attempting to free " 664 + StringUtils.byteDesc(bytesToFree) + " of total=" + StringUtils.byteDesc(currentSize)); 665 } 666 667 if (bytesToFree <= 0) { 668 return; 669 } 670 671 // Instantiate priority buckets 672 BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize, singleSize()); 673 BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize, multiSize()); 674 BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize, memorySize()); 675 676 // Scan entire map putting into appropriate buckets 677 for (LruCachedBlock cachedBlock : map.values()) { 678 switch (cachedBlock.getPriority()) { 679 case SINGLE: { 680 bucketSingle.add(cachedBlock); 681 break; 682 } 683 case MULTI: { 684 bucketMulti.add(cachedBlock); 685 break; 686 } 687 case MEMORY: { 688 bucketMemory.add(cachedBlock); 689 break; 690 } 691 } 692 } 693 694 long bytesFreed = 0; 695 if (forceInMemory || memoryFactor > 0.999f) { 696 long s = bucketSingle.totalSize(); 697 long m = bucketMulti.totalSize(); 698 if (bytesToFree > (s + m)) { 699 // this means we need to evict blocks in memory bucket to make room, 700 // so the single and multi buckets will be emptied 701 bytesFreed = bucketSingle.free(s); 702 bytesFreed += bucketMulti.free(m); 703 if (LOG.isTraceEnabled()) { 704 LOG.trace( 705 "freed " + StringUtils.byteDesc(bytesFreed) + " from single and multi buckets"); 706 } 707 bytesFreed += bucketMemory.free(bytesToFree - bytesFreed); 708 if (LOG.isTraceEnabled()) { 709 LOG.trace( 710 "freed " + StringUtils.byteDesc(bytesFreed) + " total from all three buckets "); 711 } 712 } else { 713 // this means no need to evict block in memory bucket, 714 // and we try best to make the ratio between single-bucket and 715 // multi-bucket is 1:2 716 long bytesRemain = s + m - bytesToFree; 717 if (3 * s <= bytesRemain) { 718 // single-bucket is small enough that no eviction happens for it 719 // hence all eviction goes from multi-bucket 720 bytesFreed = bucketMulti.free(bytesToFree); 721 } else if (3 * m <= 2 * bytesRemain) { 722 // multi-bucket is small enough that no eviction happens for it 723 // hence all eviction goes from single-bucket 724 bytesFreed = bucketSingle.free(bytesToFree); 725 } else { 726 // both buckets need to evict some blocks 727 bytesFreed = bucketSingle.free(s - bytesRemain / 3); 728 if (bytesFreed < bytesToFree) { 729 bytesFreed += bucketMulti.free(bytesToFree - bytesFreed); 730 } 731 } 732 } 733 } else { 734 PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3); 735 736 bucketQueue.add(bucketSingle); 737 bucketQueue.add(bucketMulti); 738 bucketQueue.add(bucketMemory); 739 740 int remainingBuckets = bucketQueue.size(); 741 742 BlockBucket bucket; 743 while ((bucket = bucketQueue.poll()) != null) { 744 long overflow = bucket.overflow(); 745 if (overflow > 0) { 746 long bucketBytesToFree = 747 Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets); 748 bytesFreed += bucket.free(bucketBytesToFree); 749 } 750 remainingBuckets--; 751 } 752 } 753 if (LOG.isTraceEnabled()) { 754 long single = bucketSingle.totalSize(); 755 long multi = bucketMulti.totalSize(); 756 long memory = bucketMemory.totalSize(); 757 LOG.trace( 758 "Block cache LRU eviction completed; " + "freed=" + StringUtils.byteDesc(bytesFreed) 759 + ", " + "total=" + StringUtils.byteDesc(this.size.get()) + ", " + "single=" 760 + StringUtils.byteDesc(single) + ", " + "multi=" + StringUtils.byteDesc(multi) + ", " 761 + "memory=" + StringUtils.byteDesc(memory)); 762 } 763 } finally { 764 stats.evict(); 765 evictionInProgress = false; 766 evictionLock.unlock(); 767 } 768 } 769 770 @Override 771 public String toString() { 772 return MoreObjects.toStringHelper(this).add("blockCount", getBlockCount()) 773 .add("currentSize", StringUtils.byteDesc(getCurrentSize())) 774 .add("freeSize", StringUtils.byteDesc(getFreeSize())) 775 .add("maxSize", StringUtils.byteDesc(getMaxSize())) 776 .add("heapSize", StringUtils.byteDesc(heapSize())) 777 .add("minSize", StringUtils.byteDesc(minSize())).add("minFactor", minFactor) 778 .add("multiSize", StringUtils.byteDesc(multiSize())).add("multiFactor", multiFactor) 779 .add("singleSize", StringUtils.byteDesc(singleSize())).add("singleFactor", singleFactor) 780 .toString(); 781 } 782 783 /** 784 * Used to group blocks into priority buckets. There will be a BlockBucket for each priority 785 * (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate number of 786 * elements out of each according to configuration parameters and their relatives sizes. 787 */ 788 private class BlockBucket implements Comparable<BlockBucket> { 789 790 private final String name; 791 private LruCachedBlockQueue queue; 792 private long totalSize = 0; 793 private long bucketSize; 794 795 public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) { 796 this.name = name; 797 this.bucketSize = bucketSize; 798 queue = new LruCachedBlockQueue(bytesToFree, blockSize); 799 totalSize = 0; 800 } 801 802 public void add(LruCachedBlock block) { 803 totalSize += block.heapSize(); 804 queue.add(block); 805 } 806 807 public long free(long toFree) { 808 if (LOG.isTraceEnabled()) { 809 LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this); 810 } 811 LruCachedBlock cb; 812 long freedBytes = 0; 813 while ((cb = queue.pollLast()) != null) { 814 freedBytes += evictBlock(cb, true); 815 if (freedBytes >= toFree) { 816 return freedBytes; 817 } 818 } 819 if (LOG.isTraceEnabled()) { 820 LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this); 821 } 822 return freedBytes; 823 } 824 825 public long overflow() { 826 return totalSize - bucketSize; 827 } 828 829 public long totalSize() { 830 return totalSize; 831 } 832 833 @Override 834 public int compareTo(BlockBucket that) { 835 return Long.compare(this.overflow(), that.overflow()); 836 } 837 838 @Override 839 public boolean equals(Object that) { 840 if (that == null || !(that instanceof BlockBucket)) { 841 return false; 842 } 843 return compareTo((BlockBucket) that) == 0; 844 } 845 846 @Override 847 public int hashCode() { 848 return Objects.hashCode(name, bucketSize, queue, totalSize); 849 } 850 851 @Override 852 public String toString() { 853 return MoreObjects.toStringHelper(this).add("name", name) 854 .add("totalSize", StringUtils.byteDesc(totalSize)) 855 .add("bucketSize", StringUtils.byteDesc(bucketSize)).toString(); 856 } 857 } 858 859 /** 860 * Get the maximum size of this cache. 861 * @return max size in bytes 862 */ 863 864 @Override 865 public long getMaxSize() { 866 return this.maxSize; 867 } 868 869 @Override 870 public long getCurrentSize() { 871 return this.size.get(); 872 } 873 874 @Override 875 public long getCurrentDataSize() { 876 return this.dataBlockSize.sum(); 877 } 878 879 public long getCurrentIndexSize() { 880 return this.indexBlockSize.sum(); 881 } 882 883 public long getCurrentBloomSize() { 884 return this.bloomBlockSize.sum(); 885 } 886 887 @Override 888 public long getFreeSize() { 889 return getMaxSize() - getCurrentSize(); 890 } 891 892 @Override 893 public long size() { 894 return getMaxSize(); 895 } 896 897 @Override 898 public long getBlockCount() { 899 return this.elements.get(); 900 } 901 902 @Override 903 public long getDataBlockCount() { 904 return this.dataBlockElements.sum(); 905 } 906 907 public long getIndexBlockCount() { 908 return this.indexBlockElements.sum(); 909 } 910 911 public long getBloomBlockCount() { 912 return this.bloomBlockElements.sum(); 913 } 914 915 EvictionThread getEvictionThread() { 916 return this.evictionThread; 917 } 918 919 /* 920 * Eviction thread. Sits in waiting state until an eviction is triggered when the cache size grows 921 * above the acceptable level.<p> Thread is triggered into action by {@link 922 * LruBlockCache#runEviction()} 923 */ 924 static class EvictionThread extends Thread { 925 926 private WeakReference<LruBlockCache> cache; 927 private volatile boolean go = true; 928 // flag set after enter the run method, used for test 929 private boolean enteringRun = false; 930 931 public EvictionThread(LruBlockCache cache) { 932 super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread"); 933 setDaemon(true); 934 this.cache = new WeakReference<>(cache); 935 } 936 937 @Override 938 public void run() { 939 enteringRun = true; 940 while (this.go) { 941 synchronized (this) { 942 try { 943 this.wait(1000 * 10/* Don't wait for ever */); 944 } catch (InterruptedException e) { 945 LOG.warn("Interrupted eviction thread ", e); 946 Thread.currentThread().interrupt(); 947 } 948 } 949 LruBlockCache cache = this.cache.get(); 950 if (cache == null) { 951 this.go = false; 952 break; 953 } 954 cache.evict(); 955 } 956 } 957 958 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY", 959 justification = "This is what we want") 960 public void evict() { 961 synchronized (this) { 962 this.notifyAll(); 963 } 964 } 965 966 synchronized void shutdown() { 967 this.go = false; 968 this.notifyAll(); 969 } 970 971 public boolean isGo() { 972 return go; 973 } 974 975 /** 976 * Used for the test. 977 */ 978 boolean isEnteringRun() { 979 return this.enteringRun; 980 } 981 } 982 983 /* 984 * Statistics thread. Periodically prints the cache statistics to the log. 985 */ 986 static class StatisticsThread extends Thread { 987 988 private final LruBlockCache lru; 989 990 public StatisticsThread(LruBlockCache lru) { 991 super("LruBlockCacheStats"); 992 setDaemon(true); 993 this.lru = lru; 994 } 995 996 @Override 997 public void run() { 998 lru.logStats(); 999 } 1000 } 1001 1002 public void logStats() { 1003 // Log size 1004 long totalSize = heapSize(); 1005 long freeSize = maxSize - totalSize; 1006 LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " + "freeSize=" 1007 + StringUtils.byteDesc(freeSize) + ", " + "max=" + StringUtils.byteDesc(this.maxSize) + ", " 1008 + "blockCount=" + getBlockCount() + ", " + "accesses=" + stats.getRequestCount() + ", " 1009 + "hits=" + stats.getHitCount() + ", " + "hitRatio=" 1010 + (stats.getHitCount() == 0 1011 ? "0" 1012 : (StringUtils.formatPercent(stats.getHitRatio(), 2) + ", ")) 1013 + ", " + "cachingAccesses=" + stats.getRequestCachingCount() + ", " + "cachingHits=" 1014 + stats.getHitCachingCount() + ", " + "cachingHitsRatio=" 1015 + (stats.getHitCachingCount() == 0 1016 ? "0," 1017 : (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) 1018 + "evictions=" + stats.getEvictionCount() + ", " + "evicted=" + stats.getEvictedCount() + ", " 1019 + "evictedPerRun=" + stats.evictedPerEviction()); 1020 } 1021 1022 /** 1023 * Get counter statistics for this cache. 1024 * <p> 1025 * Includes: total accesses, hits, misses, evicted blocks, and runs of the eviction processes. 1026 */ 1027 @Override 1028 public CacheStats getStats() { 1029 return this.stats; 1030 } 1031 1032 public final static long CACHE_FIXED_OVERHEAD = 1033 ClassSize.estimateBase(LruBlockCache.class, false); 1034 1035 @Override 1036 public long heapSize() { 1037 return getCurrentSize(); 1038 } 1039 1040 private static long calculateOverhead(long maxSize, long blockSize, int concurrency) { 1041 // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG 1042 return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP 1043 + ((long) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY) 1044 + ((long) concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT); 1045 } 1046 1047 @Override 1048 public Iterator<CachedBlock> iterator() { 1049 final Iterator<LruCachedBlock> iterator = map.values().iterator(); 1050 1051 return new Iterator<CachedBlock>() { 1052 private final long now = System.nanoTime(); 1053 1054 @Override 1055 public boolean hasNext() { 1056 return iterator.hasNext(); 1057 } 1058 1059 @Override 1060 public CachedBlock next() { 1061 final LruCachedBlock b = iterator.next(); 1062 return new CachedBlock() { 1063 @Override 1064 public String toString() { 1065 return BlockCacheUtil.toString(this, now); 1066 } 1067 1068 @Override 1069 public BlockPriority getBlockPriority() { 1070 return b.getPriority(); 1071 } 1072 1073 @Override 1074 public BlockType getBlockType() { 1075 return b.getBuffer().getBlockType(); 1076 } 1077 1078 @Override 1079 public long getOffset() { 1080 return b.getCacheKey().getOffset(); 1081 } 1082 1083 @Override 1084 public long getSize() { 1085 return b.getBuffer().heapSize(); 1086 } 1087 1088 @Override 1089 public long getCachedTime() { 1090 return b.getCachedTime(); 1091 } 1092 1093 @Override 1094 public String getFilename() { 1095 return b.getCacheKey().getHfileName(); 1096 } 1097 1098 @Override 1099 public int compareTo(CachedBlock other) { 1100 int diff = this.getFilename().compareTo(other.getFilename()); 1101 if (diff != 0) { 1102 return diff; 1103 } 1104 diff = Long.compare(this.getOffset(), other.getOffset()); 1105 if (diff != 0) { 1106 return diff; 1107 } 1108 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) { 1109 throw new IllegalStateException(this.getCachedTime() + ", " + other.getCachedTime()); 1110 } 1111 return Long.compare(other.getCachedTime(), this.getCachedTime()); 1112 } 1113 1114 @Override 1115 public int hashCode() { 1116 return b.hashCode(); 1117 } 1118 1119 @Override 1120 public boolean equals(Object obj) { 1121 if (obj instanceof CachedBlock) { 1122 CachedBlock cb = (CachedBlock) obj; 1123 return compareTo(cb) == 0; 1124 } else { 1125 return false; 1126 } 1127 } 1128 }; 1129 } 1130 1131 @Override 1132 public void remove() { 1133 throw new UnsupportedOperationException(); 1134 } 1135 }; 1136 } 1137 1138 // Simple calculators of sizes given factors and maxSize 1139 1140 long acceptableSize() { 1141 return (long) Math.floor(this.maxSize * this.acceptableFactor); 1142 } 1143 1144 private long minSize() { 1145 return (long) Math.floor(this.maxSize * this.minFactor); 1146 } 1147 1148 private long singleSize() { 1149 return (long) Math.floor(this.maxSize * this.singleFactor * this.minFactor); 1150 } 1151 1152 private long multiSize() { 1153 return (long) Math.floor(this.maxSize * this.multiFactor * this.minFactor); 1154 } 1155 1156 private long memorySize() { 1157 return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor); 1158 } 1159 1160 @Override 1161 public void shutdown() { 1162 if (victimHandler != null) { 1163 victimHandler.shutdown(); 1164 } 1165 this.scheduleThreadPool.shutdown(); 1166 for (int i = 0; i < 10; i++) { 1167 if (!this.scheduleThreadPool.isShutdown()) { 1168 try { 1169 Thread.sleep(10); 1170 } catch (InterruptedException e) { 1171 LOG.warn("Interrupted while sleeping"); 1172 Thread.currentThread().interrupt(); 1173 break; 1174 } 1175 } 1176 } 1177 1178 if (!this.scheduleThreadPool.isShutdown()) { 1179 List<Runnable> runnables = this.scheduleThreadPool.shutdownNow(); 1180 LOG.debug("Still running " + runnables); 1181 } 1182 this.evictionThread.shutdown(); 1183 } 1184 1185 /** Clears the cache. Used in tests. */ 1186 public void clearCache() { 1187 this.map.clear(); 1188 this.elements.set(0); 1189 } 1190 1191 /** 1192 * Used in testing. May be very inefficient. 1193 * @return the set of cached file names 1194 */ 1195 SortedSet<String> getCachedFileNamesForTest() { 1196 SortedSet<String> fileNames = new TreeSet<>(); 1197 for (BlockCacheKey cacheKey : map.keySet()) { 1198 fileNames.add(cacheKey.getHfileName()); 1199 } 1200 return fileNames; 1201 } 1202 1203 public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() { 1204 Map<DataBlockEncoding, Integer> counts = new EnumMap<>(DataBlockEncoding.class); 1205 for (LruCachedBlock block : map.values()) { 1206 DataBlockEncoding encoding = ((HFileBlock) block.getBuffer()).getDataBlockEncoding(); 1207 Integer count = counts.get(encoding); 1208 counts.put(encoding, (count == null ? 0 : count) + 1); 1209 } 1210 return counts; 1211 } 1212 1213 Map<BlockCacheKey, LruCachedBlock> getMapForTests() { 1214 return map; 1215 } 1216 1217 @Override 1218 public BlockCache[] getBlockCaches() { 1219 if (victimHandler != null) { 1220 return new BlockCache[] { this, this.victimHandler }; 1221 } 1222 return null; 1223 } 1224}