001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static java.util.Objects.requireNonNull; 021 022import java.lang.ref.WeakReference; 023import java.util.EnumMap; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Map; 027import java.util.PriorityQueue; 028import java.util.SortedSet; 029import java.util.TreeSet; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.Executors; 032import java.util.concurrent.ScheduledExecutorService; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicLong; 035import java.util.concurrent.atomic.LongAdder; 036import java.util.concurrent.locks.ReentrantLock; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.io.HeapSize; 039import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 040import org.apache.hadoop.hbase.util.ClassSize; 041import org.apache.hadoop.util.StringUtils; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 047import org.apache.hbase.thirdparty.com.google.common.base.Objects; 048import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 049 050/** 051 * A block cache implementation that is memory-aware using {@link HeapSize}, 052 * memory-bound using an LRU eviction algorithm, and concurrent: backed by a 053 * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving 054 * constant-time {@link #cacheBlock} and {@link #getBlock} operations.<p> 055 * 056 * Contains three levels of block priority to allow for scan-resistance and in-memory families 057 * {@link org.apache.hadoop.hbase.HColumnDescriptor#setInMemory(boolean)} (An in-memory column 058 * family is a column family that should be served from memory if possible): 059 * single-access, multiple-accesses, and in-memory priority. 060 * A block is added with an in-memory priority flag if 061 * {@link org.apache.hadoop.hbase.HColumnDescriptor#isInMemory()}, otherwise a block becomes a 062 * single access priority the first time it is read into this block cache. If a block is 063 * accessed again while in cache, it is marked as a multiple access priority block. This 064 * delineation of blocks is used to prevent scans from thrashing the cache adding a 065 * least-frequently-used element to the eviction algorithm.<p> 066 * 067 * Each priority is given its own chunk of the total cache to ensure 068 * fairness during eviction. Each priority will retain close to its maximum 069 * size, however, if any priority is not using its entire chunk the others 070 * are able to grow beyond their chunk size.<p> 071 * 072 * Instantiated at a minimum with the total size and average block size. 073 * All sizes are in bytes. The block size is not especially important as this 074 * cache is fully dynamic in its sizing of blocks. It is only used for 075 * pre-allocating data structures and in initial heap estimation of the map.<p> 076 * 077 * The detailed constructor defines the sizes for the three priorities (they 078 * should total to the <code>maximum size</code> defined). It also sets the levels that 079 * trigger and control the eviction thread.<p> 080 * 081 * The <code>acceptable size</code> is the cache size level which triggers the eviction 082 * process to start. It evicts enough blocks to get the size below the 083 * minimum size specified.<p> 084 * 085 * Eviction happens in a separate thread and involves a single full-scan 086 * of the map. It determines how many bytes must be freed to reach the minimum 087 * size, and then while scanning determines the fewest least-recently-used 088 * blocks necessary from each of the three priorities (would be 3 times bytes 089 * to free). It then uses the priority chunk sizes to evict fairly according 090 * to the relative sizes and usage. 091 */ 092@InterfaceAudience.Private 093public class LruBlockCache implements FirstLevelBlockCache { 094 095 private static final Logger LOG = LoggerFactory.getLogger(LruBlockCache.class); 096 097 /** 098 * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep 099 * evicting during an eviction run till the cache size is down to 80% of the total. 100 */ 101 private static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor"; 102 103 /** 104 * Acceptable size of cache (no evictions if size < acceptable) 105 */ 106 private static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME = 107 "hbase.lru.blockcache.acceptable.factor"; 108 109 /** 110 * Hard capacity limit of cache, will reject any put if size > this * acceptable 111 */ 112 static final String LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME = 113 "hbase.lru.blockcache.hard.capacity.limit.factor"; 114 private static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME = 115 "hbase.lru.blockcache.single.percentage"; 116 private static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME = 117 "hbase.lru.blockcache.multi.percentage"; 118 private static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME = 119 "hbase.lru.blockcache.memory.percentage"; 120 121 /** 122 * Configuration key to force data-block always (except in-memory are too much) 123 * cached in memory for in-memory hfile, unlike inMemory, which is a column-family 124 * configuration, inMemoryForceMode is a cluster-wide configuration 125 */ 126 private static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME = 127 "hbase.lru.rs.inmemoryforcemode"; 128 129 /* Default Configuration Parameters*/ 130 131 /* Backing Concurrent Map Configuration */ 132 static final float DEFAULT_LOAD_FACTOR = 0.75f; 133 static final int DEFAULT_CONCURRENCY_LEVEL = 16; 134 135 /* Eviction thresholds */ 136 private static final float DEFAULT_MIN_FACTOR = 0.95f; 137 static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f; 138 139 /* Priority buckets */ 140 private static final float DEFAULT_SINGLE_FACTOR = 0.25f; 141 private static final float DEFAULT_MULTI_FACTOR = 0.50f; 142 private static final float DEFAULT_MEMORY_FACTOR = 0.25f; 143 144 private static final float DEFAULT_HARD_CAPACITY_LIMIT_FACTOR = 1.2f; 145 146 private static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false; 147 148 /* Statistics thread */ 149 private static final int STAT_THREAD_PERIOD = 60 * 5; 150 private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size"; 151 private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L; 152 153 /** 154 * Defined the cache map as {@link ConcurrentHashMap} here, because in 155 * {@link LruBlockCache#getBlock}, we need to guarantee the atomicity of map#computeIfPresent 156 * (key, func). Besides, the func method must execute exactly once only when the key is present 157 * and under the lock context, otherwise the reference count will be messed up. Notice that the 158 * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that. 159 */ 160 private transient final ConcurrentHashMap<BlockCacheKey, LruCachedBlock> map; 161 162 /** Eviction lock (locked when eviction in process) */ 163 private transient final ReentrantLock evictionLock = new ReentrantLock(true); 164 165 private final long maxBlockSize; 166 167 /** Volatile boolean to track if we are in an eviction process or not */ 168 private volatile boolean evictionInProgress = false; 169 170 /** Eviction thread */ 171 private transient final EvictionThread evictionThread; 172 173 /** Statistics thread schedule pool (for heavy debugging, could remove) */ 174 private transient final ScheduledExecutorService scheduleThreadPool = 175 Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() 176 .setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build()); 177 178 /** Current size of cache */ 179 private final AtomicLong size; 180 181 /** Current size of data blocks */ 182 private final LongAdder dataBlockSize; 183 184 /** Current number of cached elements */ 185 private final AtomicLong elements; 186 187 /** Current number of cached data block elements */ 188 private final LongAdder dataBlockElements; 189 190 /** Cache access count (sequential ID) */ 191 private final AtomicLong count; 192 193 /** hard capacity limit */ 194 private float hardCapacityLimitFactor; 195 196 /** Cache statistics */ 197 private final CacheStats stats; 198 199 /** Maximum allowable size of cache (block put if size > max, evict) */ 200 private long maxSize; 201 202 /** Approximate block size */ 203 private long blockSize; 204 205 /** Acceptable size of cache (no evictions if size < acceptable) */ 206 private float acceptableFactor; 207 208 /** Minimum threshold of cache (when evicting, evict until size < min) */ 209 private float minFactor; 210 211 /** Single access bucket size */ 212 private float singleFactor; 213 214 /** Multiple access bucket size */ 215 private float multiFactor; 216 217 /** In-memory bucket size */ 218 private float memoryFactor; 219 220 /** Overhead of the structure itself */ 221 private long overhead; 222 223 /** Whether in-memory hfile's data block has higher priority when evicting */ 224 private boolean forceInMemory; 225 226 /** 227 * Where to send victims (blocks evicted/missing from the cache). This is used only when we use an 228 * external cache as L2. 229 * Note: See org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache 230 */ 231 private transient BlockCache victimHandler = null; 232 233 /** 234 * Default constructor. Specify maximum size and expected average block 235 * size (approximation is fine). 236 * 237 * <p>All other factors will be calculated based on defaults specified in 238 * this class. 239 * 240 * @param maxSize maximum size of cache, in bytes 241 * @param blockSize approximate size of each block, in bytes 242 */ 243 public LruBlockCache(long maxSize, long blockSize) { 244 this(maxSize, blockSize, true); 245 } 246 247 /** 248 * Constructor used for testing. Allows disabling of the eviction thread. 249 */ 250 public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) { 251 this(maxSize, blockSize, evictionThread, 252 (int) Math.ceil(1.2 * maxSize / blockSize), 253 DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL, 254 DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR, 255 DEFAULT_SINGLE_FACTOR, 256 DEFAULT_MULTI_FACTOR, 257 DEFAULT_MEMORY_FACTOR, 258 DEFAULT_HARD_CAPACITY_LIMIT_FACTOR, 259 false, 260 DEFAULT_MAX_BLOCK_SIZE); 261 } 262 263 public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) { 264 this(maxSize, blockSize, evictionThread, 265 (int) Math.ceil(1.2 * maxSize / blockSize), 266 DEFAULT_LOAD_FACTOR, 267 DEFAULT_CONCURRENCY_LEVEL, 268 conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR), 269 conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR), 270 conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR), 271 conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR), 272 conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR), 273 conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, 274 DEFAULT_HARD_CAPACITY_LIMIT_FACTOR), 275 conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE), 276 conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE)); 277 } 278 279 public LruBlockCache(long maxSize, long blockSize, Configuration conf) { 280 this(maxSize, blockSize, true, conf); 281 } 282 283 /** 284 * Configurable constructor. Use this constructor if not using defaults. 285 * 286 * @param maxSize maximum size of this cache, in bytes 287 * @param blockSize expected average size of blocks, in bytes 288 * @param evictionThread whether to run evictions in a bg thread or not 289 * @param mapInitialSize initial size of backing ConcurrentHashMap 290 * @param mapLoadFactor initial load factor of backing ConcurrentHashMap 291 * @param mapConcurrencyLevel initial concurrency factor for backing CHM 292 * @param minFactor percentage of total size that eviction will evict until 293 * @param acceptableFactor percentage of total size that triggers eviction 294 * @param singleFactor percentage of total size for single-access blocks 295 * @param multiFactor percentage of total size for multiple-access blocks 296 * @param memoryFactor percentage of total size for in-memory blocks 297 */ 298 public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, 299 int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel, 300 float minFactor, float acceptableFactor, float singleFactor, 301 float multiFactor, float memoryFactor, float hardLimitFactor, 302 boolean forceInMemory, long maxBlockSize) { 303 this.maxBlockSize = maxBlockSize; 304 if(singleFactor + multiFactor + memoryFactor != 1 || 305 singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) { 306 throw new IllegalArgumentException("Single, multi, and memory factors " + 307 " should be non-negative and total 1.0"); 308 } 309 if (minFactor >= acceptableFactor) { 310 throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor"); 311 } 312 if (minFactor >= 1.0f || acceptableFactor >= 1.0f) { 313 throw new IllegalArgumentException("all factors must be < 1"); 314 } 315 this.maxSize = maxSize; 316 this.blockSize = blockSize; 317 this.forceInMemory = forceInMemory; 318 map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel); 319 this.minFactor = minFactor; 320 this.acceptableFactor = acceptableFactor; 321 this.singleFactor = singleFactor; 322 this.multiFactor = multiFactor; 323 this.memoryFactor = memoryFactor; 324 this.stats = new CacheStats(this.getClass().getSimpleName()); 325 this.count = new AtomicLong(0); 326 this.elements = new AtomicLong(0); 327 this.dataBlockElements = new LongAdder(); 328 this.dataBlockSize = new LongAdder(); 329 this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel); 330 this.size = new AtomicLong(this.overhead); 331 this.hardCapacityLimitFactor = hardLimitFactor; 332 if (evictionThread) { 333 this.evictionThread = new EvictionThread(this); 334 this.evictionThread.start(); // FindBugs SC_START_IN_CTOR 335 } else { 336 this.evictionThread = null; 337 } 338 // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log 339 // every five minutes. 340 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD, 341 STAT_THREAD_PERIOD, TimeUnit.SECONDS); 342 } 343 344 @Override 345 public void setVictimCache(BlockCache victimCache) { 346 if (victimHandler != null) { 347 throw new IllegalArgumentException("The victim cache has already been set"); 348 } 349 victimHandler = requireNonNull(victimCache); 350 } 351 352 @Override 353 public void setMaxSize(long maxSize) { 354 this.maxSize = maxSize; 355 if (this.size.get() > acceptableSize() && !evictionInProgress) { 356 runEviction(); 357 } 358 } 359 360 /** 361 * The block cached in LRUBlockCache will always be an heap block: on the one side, the heap 362 * access will be more faster then off-heap, the small index block or meta block cached in 363 * CombinedBlockCache will benefit a lot. on other side, the LRUBlockCache size is always 364 * calculated based on the total heap size, if caching an off-heap block in LRUBlockCache, the 365 * heap size will be messed up. Here we will clone the block into an heap block if it's an 366 * off-heap block, otherwise just use the original block. The key point is maintain the refCnt of 367 * the block (HBASE-22127): <br> 368 * 1. if cache the cloned heap block, its refCnt is an totally new one, it's easy to handle; <br> 369 * 2. if cache the original heap block, we're sure that it won't be tracked in ByteBuffAllocator's 370 * reservoir, if both RPC and LRUBlockCache release the block, then it can be garbage collected by 371 * JVM, so need a retain here. 372 * @param buf the original block 373 * @return an block with an heap memory backend. 374 */ 375 private Cacheable asReferencedHeapBlock(Cacheable buf) { 376 if (buf instanceof HFileBlock) { 377 HFileBlock blk = ((HFileBlock) buf); 378 if (blk.isSharedMem()) { 379 return HFileBlock.deepCloneOnHeap(blk); 380 } 381 } 382 // The block will be referenced by this LRUBlockCache, so should increase its refCnt here. 383 return buf.retain(); 384 } 385 386 // BlockCache implementation 387 388 /** 389 * Cache the block with the specified name and buffer. 390 * <p> 391 * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547) 392 * this can happen, for which we compare the buffer contents. 393 * 394 * @param cacheKey block's cache key 395 * @param buf block buffer 396 * @param inMemory if block is in-memory 397 */ 398 @Override 399 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { 400 if (buf.heapSize() > maxBlockSize) { 401 // If there are a lot of blocks that are too 402 // big this can make the logs way too noisy. 403 // So we log 2% 404 if (stats.failInsert() % 50 == 0) { 405 LOG.warn("Trying to cache too large a block " 406 + cacheKey.getHfileName() + " @ " 407 + cacheKey.getOffset() 408 + " is " + buf.heapSize() 409 + " which is larger than " + maxBlockSize); 410 } 411 return; 412 } 413 414 LruCachedBlock cb = map.get(cacheKey); 415 if (cb != null && !BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, buf)) { 416 return; 417 } 418 long currentSize = size.get(); 419 long currentAcceptableSize = acceptableSize(); 420 long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize); 421 if (currentSize >= hardLimitSize) { 422 stats.failInsert(); 423 if (LOG.isTraceEnabled()) { 424 LOG.trace("LruBlockCache current size " + StringUtils.byteDesc(currentSize) 425 + " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + "." 426 + " The hard limit size is " + StringUtils.byteDesc(hardLimitSize) 427 + ", failed to put cacheKey:" + cacheKey + " into LruBlockCache."); 428 } 429 if (!evictionInProgress) { 430 runEviction(); 431 } 432 return; 433 } 434 // Ensure that the block is an heap one. 435 buf = asReferencedHeapBlock(buf); 436 cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory); 437 long newSize = updateSizeMetrics(cb, false); 438 map.put(cacheKey, cb); 439 long val = elements.incrementAndGet(); 440 if (buf.getBlockType().isData()) { 441 dataBlockElements.increment(); 442 } 443 if (LOG.isTraceEnabled()) { 444 long size = map.size(); 445 assertCounterSanity(size, val); 446 } 447 if (newSize > currentAcceptableSize && !evictionInProgress) { 448 runEviction(); 449 } 450 } 451 452 /** 453 * Sanity-checking for parity between actual block cache content and metrics. 454 * Intended only for use with TRACE level logging and -ea JVM. 455 */ 456 private static void assertCounterSanity(long mapSize, long counterVal) { 457 if (counterVal < 0) { 458 LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal + 459 ", mapSize=" + mapSize); 460 return; 461 } 462 if (mapSize < Integer.MAX_VALUE) { 463 double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.); 464 if (pct_diff > 0.05) { 465 LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal + 466 ", mapSize=" + mapSize); 467 } 468 } 469 } 470 471 /** 472 * Cache the block with the specified name and buffer. 473 * <p> 474 * TODO after HBASE-22005, we may cache an block which allocated from off-heap, but our LRU cache 475 * sizing is based on heap size, so we should handle this in HBASE-22127. It will introduce an 476 * switch whether make the LRU on-heap or not, if so we may need copy the memory to on-heap, 477 * otherwise the caching size is based on off-heap. 478 * @param cacheKey block's cache key 479 * @param buf block buffer 480 */ 481 @Override 482 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 483 cacheBlock(cacheKey, buf, false); 484 } 485 486 /** 487 * Helper function that updates the local size counter and also updates any 488 * per-cf or per-blocktype metrics it can discern from given 489 * {@link LruCachedBlock} 490 */ 491 private long updateSizeMetrics(LruCachedBlock cb, boolean evict) { 492 long heapsize = cb.heapSize(); 493 BlockType bt = cb.getBuffer().getBlockType(); 494 if (evict) { 495 heapsize *= -1; 496 } 497 if (bt != null && bt.isData()) { 498 dataBlockSize.add(heapsize); 499 } 500 return size.addAndGet(heapsize); 501 } 502 503 /** 504 * Get the buffer of the block with the specified name. 505 * 506 * @param cacheKey block's cache key 507 * @param caching true if the caller caches blocks on cache misses 508 * @param repeat Whether this is a repeat lookup for the same block 509 * (used to avoid double counting cache misses when doing double-check 510 * locking) 511 * @param updateCacheMetrics Whether to update cache metrics or not 512 * 513 * @return buffer of specified cache key, or null if not in cache 514 */ 515 @Override 516 public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, 517 boolean updateCacheMetrics) { 518 LruCachedBlock cb = map.computeIfPresent(cacheKey, (key, val) -> { 519 // It will be referenced by RPC path, so increase here. NOTICE: Must do the retain inside 520 // this block. because if retain outside the map#computeIfPresent, the evictBlock may remove 521 // the block and release, then we're retaining a block with refCnt=0 which is disallowed. 522 // see HBASE-22422. 523 val.getBuffer().retain(); 524 return val; 525 }); 526 if (cb == null) { 527 if (!repeat && updateCacheMetrics) { 528 stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 529 } 530 // If there is another block cache then try and read there. 531 // However if this is a retry ( second time in double checked locking ) 532 // And it's already a miss then the l2 will also be a miss. 533 if (victimHandler != null && !repeat) { 534 // The handler will increase result's refCnt for RPC, so need no extra retain. 535 Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics); 536 // Promote this to L1. 537 if (result != null) { 538 if (caching) { 539 cacheBlock(cacheKey, result, /* inMemory = */ false); 540 } 541 } 542 return result; 543 } 544 return null; 545 } 546 if (updateCacheMetrics) { 547 stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 548 } 549 cb.access(count.incrementAndGet()); 550 return cb.getBuffer(); 551 } 552 553 /** 554 * Whether the cache contains block with specified cacheKey 555 * 556 * @return true if contains the block 557 */ 558 @Override 559 public boolean containsBlock(BlockCacheKey cacheKey) { 560 return map.containsKey(cacheKey); 561 } 562 563 @Override 564 public boolean evictBlock(BlockCacheKey cacheKey) { 565 LruCachedBlock cb = map.get(cacheKey); 566 return cb != null && evictBlock(cb, false) > 0; 567 } 568 569 /** 570 * Evicts all blocks for a specific HFile. This is an 571 * expensive operation implemented as a linear-time search through all blocks 572 * in the cache. Ideally this should be a search in a log-access-time map. 573 * 574 * <p> 575 * This is used for evict-on-close to remove all blocks of a specific HFile. 576 * 577 * @return the number of blocks evicted 578 */ 579 @Override 580 public int evictBlocksByHfileName(String hfileName) { 581 int numEvicted = 0; 582 for (BlockCacheKey key : map.keySet()) { 583 if (key.getHfileName().equals(hfileName)) { 584 if (evictBlock(key)) { 585 ++numEvicted; 586 } 587 } 588 } 589 if (victimHandler != null) { 590 numEvicted += victimHandler.evictBlocksByHfileName(hfileName); 591 } 592 return numEvicted; 593 } 594 595 /** 596 * Evict the block, and it will be cached by the victim handler if exists && 597 * block may be read again later 598 * 599 * @param evictedByEvictionProcess true if the given block is evicted by 600 * EvictionThread 601 * @return the heap size of evicted block 602 */ 603 protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) { 604 LruCachedBlock previous = map.remove(block.getCacheKey()); 605 if (previous == null) { 606 return 0; 607 } 608 updateSizeMetrics(block, true); 609 long val = elements.decrementAndGet(); 610 if (LOG.isTraceEnabled()) { 611 long size = map.size(); 612 assertCounterSanity(size, val); 613 } 614 if (block.getBuffer().getBlockType().isData()) { 615 dataBlockElements.decrement(); 616 } 617 if (evictedByEvictionProcess) { 618 // When the eviction of the block happened because of invalidation of HFiles, no need to 619 // update the stats counter. 620 stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary()); 621 if (victimHandler != null) { 622 victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer()); 623 } 624 } 625 // Decrease the block's reference count, and if refCount is 0, then it'll auto-deallocate. DO 626 // NOT move this up because if do that then the victimHandler may access the buffer with 627 // refCnt = 0 which is disallowed. 628 previous.getBuffer().release(); 629 return block.heapSize(); 630 } 631 632 /** 633 * Multi-threaded call to run the eviction process. 634 */ 635 private void runEviction() { 636 if (evictionThread == null) { 637 evict(); 638 } else { 639 evictionThread.evict(); 640 } 641 } 642 643 boolean isEvictionInProgress() { 644 return evictionInProgress; 645 } 646 647 long getOverhead() { 648 return overhead; 649 } 650 651 /** 652 * Eviction method. 653 */ 654 void evict() { 655 656 // Ensure only one eviction at a time 657 if (!evictionLock.tryLock()) { 658 return; 659 } 660 661 try { 662 evictionInProgress = true; 663 long currentSize = this.size.get(); 664 long bytesToFree = currentSize - minSize(); 665 666 if (LOG.isTraceEnabled()) { 667 LOG.trace("Block cache LRU eviction started; Attempting to free " + 668 StringUtils.byteDesc(bytesToFree) + " of total=" + 669 StringUtils.byteDesc(currentSize)); 670 } 671 672 if (bytesToFree <= 0) { 673 return; 674 } 675 676 // Instantiate priority buckets 677 BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize, singleSize()); 678 BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize, multiSize()); 679 BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize, memorySize()); 680 681 // Scan entire map putting into appropriate buckets 682 for (LruCachedBlock cachedBlock : map.values()) { 683 switch (cachedBlock.getPriority()) { 684 case SINGLE: { 685 bucketSingle.add(cachedBlock); 686 break; 687 } 688 case MULTI: { 689 bucketMulti.add(cachedBlock); 690 break; 691 } 692 case MEMORY: { 693 bucketMemory.add(cachedBlock); 694 break; 695 } 696 } 697 } 698 699 long bytesFreed = 0; 700 if (forceInMemory || memoryFactor > 0.999f) { 701 long s = bucketSingle.totalSize(); 702 long m = bucketMulti.totalSize(); 703 if (bytesToFree > (s + m)) { 704 // this means we need to evict blocks in memory bucket to make room, 705 // so the single and multi buckets will be emptied 706 bytesFreed = bucketSingle.free(s); 707 bytesFreed += bucketMulti.free(m); 708 if (LOG.isTraceEnabled()) { 709 LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) + 710 " from single and multi buckets"); 711 } 712 bytesFreed += bucketMemory.free(bytesToFree - bytesFreed); 713 if (LOG.isTraceEnabled()) { 714 LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) + 715 " total from all three buckets "); 716 } 717 } else { 718 // this means no need to evict block in memory bucket, 719 // and we try best to make the ratio between single-bucket and 720 // multi-bucket is 1:2 721 long bytesRemain = s + m - bytesToFree; 722 if (3 * s <= bytesRemain) { 723 // single-bucket is small enough that no eviction happens for it 724 // hence all eviction goes from multi-bucket 725 bytesFreed = bucketMulti.free(bytesToFree); 726 } else if (3 * m <= 2 * bytesRemain) { 727 // multi-bucket is small enough that no eviction happens for it 728 // hence all eviction goes from single-bucket 729 bytesFreed = bucketSingle.free(bytesToFree); 730 } else { 731 // both buckets need to evict some blocks 732 bytesFreed = bucketSingle.free(s - bytesRemain / 3); 733 if (bytesFreed < bytesToFree) { 734 bytesFreed += bucketMulti.free(bytesToFree - bytesFreed); 735 } 736 } 737 } 738 } else { 739 PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3); 740 741 bucketQueue.add(bucketSingle); 742 bucketQueue.add(bucketMulti); 743 bucketQueue.add(bucketMemory); 744 745 int remainingBuckets = bucketQueue.size(); 746 747 BlockBucket bucket; 748 while ((bucket = bucketQueue.poll()) != null) { 749 long overflow = bucket.overflow(); 750 if (overflow > 0) { 751 long bucketBytesToFree = 752 Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets); 753 bytesFreed += bucket.free(bucketBytesToFree); 754 } 755 remainingBuckets--; 756 } 757 } 758 if (LOG.isTraceEnabled()) { 759 long single = bucketSingle.totalSize(); 760 long multi = bucketMulti.totalSize(); 761 long memory = bucketMemory.totalSize(); 762 LOG.trace("Block cache LRU eviction completed; " + 763 "freed=" + StringUtils.byteDesc(bytesFreed) + ", " + 764 "total=" + StringUtils.byteDesc(this.size.get()) + ", " + 765 "single=" + StringUtils.byteDesc(single) + ", " + 766 "multi=" + StringUtils.byteDesc(multi) + ", " + 767 "memory=" + StringUtils.byteDesc(memory)); 768 } 769 } finally { 770 stats.evict(); 771 evictionInProgress = false; 772 evictionLock.unlock(); 773 } 774 } 775 776 @Override 777 public String toString() { 778 return MoreObjects.toStringHelper(this) 779 .add("blockCount", getBlockCount()) 780 .add("currentSize", StringUtils.byteDesc(getCurrentSize())) 781 .add("freeSize", StringUtils.byteDesc(getFreeSize())) 782 .add("maxSize", StringUtils.byteDesc(getMaxSize())) 783 .add("heapSize", StringUtils.byteDesc(heapSize())) 784 .add("minSize", StringUtils.byteDesc(minSize())) 785 .add("minFactor", minFactor) 786 .add("multiSize", StringUtils.byteDesc(multiSize())) 787 .add("multiFactor", multiFactor) 788 .add("singleSize", StringUtils.byteDesc(singleSize())) 789 .add("singleFactor", singleFactor) 790 .toString(); 791 } 792 793 /** 794 * Used to group blocks into priority buckets. There will be a BlockBucket 795 * for each priority (single, multi, memory). Once bucketed, the eviction 796 * algorithm takes the appropriate number of elements out of each according 797 * to configuration parameters and their relatives sizes. 798 */ 799 private class BlockBucket implements Comparable<BlockBucket> { 800 801 private final String name; 802 private LruCachedBlockQueue queue; 803 private long totalSize = 0; 804 private long bucketSize; 805 806 public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) { 807 this.name = name; 808 this.bucketSize = bucketSize; 809 queue = new LruCachedBlockQueue(bytesToFree, blockSize); 810 totalSize = 0; 811 } 812 813 public void add(LruCachedBlock block) { 814 totalSize += block.heapSize(); 815 queue.add(block); 816 } 817 818 public long free(long toFree) { 819 if (LOG.isTraceEnabled()) { 820 LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this); 821 } 822 LruCachedBlock cb; 823 long freedBytes = 0; 824 while ((cb = queue.pollLast()) != null) { 825 freedBytes += evictBlock(cb, true); 826 if (freedBytes >= toFree) { 827 return freedBytes; 828 } 829 } 830 if (LOG.isTraceEnabled()) { 831 LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this); 832 } 833 return freedBytes; 834 } 835 836 public long overflow() { 837 return totalSize - bucketSize; 838 } 839 840 public long totalSize() { 841 return totalSize; 842 } 843 844 @Override 845 public int compareTo(BlockBucket that) { 846 return Long.compare(this.overflow(), that.overflow()); 847 } 848 849 @Override 850 public boolean equals(Object that) { 851 if (that == null || !(that instanceof BlockBucket)) { 852 return false; 853 } 854 return compareTo((BlockBucket)that) == 0; 855 } 856 857 @Override 858 public int hashCode() { 859 return Objects.hashCode(name, bucketSize, queue, totalSize); 860 } 861 862 @Override 863 public String toString() { 864 return MoreObjects.toStringHelper(this) 865 .add("name", name) 866 .add("totalSize", StringUtils.byteDesc(totalSize)) 867 .add("bucketSize", StringUtils.byteDesc(bucketSize)) 868 .toString(); 869 } 870 } 871 872 /** 873 * Get the maximum size of this cache. 874 * 875 * @return max size in bytes 876 */ 877 878 @Override 879 public long getMaxSize() { 880 return this.maxSize; 881 } 882 883 @Override 884 public long getCurrentSize() { 885 return this.size.get(); 886 } 887 888 @Override 889 public long getCurrentDataSize() { 890 return this.dataBlockSize.sum(); 891 } 892 893 @Override 894 public long getFreeSize() { 895 return getMaxSize() - getCurrentSize(); 896 } 897 898 @Override 899 public long size() { 900 return getMaxSize(); 901 } 902 903 @Override 904 public long getBlockCount() { 905 return this.elements.get(); 906 } 907 908 @Override 909 public long getDataBlockCount() { 910 return this.dataBlockElements.sum(); 911 } 912 913 EvictionThread getEvictionThread() { 914 return this.evictionThread; 915 } 916 917 /* 918 * Eviction thread. Sits in waiting state until an eviction is triggered 919 * when the cache size grows above the acceptable level.<p> 920 * 921 * Thread is triggered into action by {@link LruBlockCache#runEviction()} 922 */ 923 static class EvictionThread extends Thread { 924 925 private WeakReference<LruBlockCache> cache; 926 private volatile boolean go = true; 927 // flag set after enter the run method, used for test 928 private boolean enteringRun = false; 929 930 public EvictionThread(LruBlockCache cache) { 931 super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread"); 932 setDaemon(true); 933 this.cache = new WeakReference<>(cache); 934 } 935 936 @Override 937 public void run() { 938 enteringRun = true; 939 while (this.go) { 940 synchronized (this) { 941 try { 942 this.wait(1000 * 10/*Don't wait for ever*/); 943 } catch (InterruptedException e) { 944 LOG.warn("Interrupted eviction thread ", e); 945 Thread.currentThread().interrupt(); 946 } 947 } 948 LruBlockCache cache = this.cache.get(); 949 if (cache == null) { 950 break; 951 } 952 cache.evict(); 953 } 954 } 955 956 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY", 957 justification="This is what we want") 958 public void evict() { 959 synchronized (this) { 960 this.notifyAll(); 961 } 962 } 963 964 synchronized void shutdown() { 965 this.go = false; 966 this.notifyAll(); 967 } 968 969 /** 970 * Used for the test. 971 */ 972 boolean isEnteringRun() { 973 return this.enteringRun; 974 } 975 } 976 977 /* 978 * Statistics thread. Periodically prints the cache statistics to the log. 979 */ 980 static class StatisticsThread extends Thread { 981 982 private final LruBlockCache lru; 983 984 public StatisticsThread(LruBlockCache lru) { 985 super("LruBlockCacheStats"); 986 setDaemon(true); 987 this.lru = lru; 988 } 989 990 @Override 991 public void run() { 992 lru.logStats(); 993 } 994 } 995 996 public void logStats() { 997 // Log size 998 long totalSize = heapSize(); 999 long freeSize = maxSize - totalSize; 1000 LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " + 1001 "freeSize=" + StringUtils.byteDesc(freeSize) + ", " + 1002 "max=" + StringUtils.byteDesc(this.maxSize) + ", " + 1003 "blockCount=" + getBlockCount() + ", " + 1004 "accesses=" + stats.getRequestCount() + ", " + 1005 "hits=" + stats.getHitCount() + ", " + 1006 "hitRatio=" + (stats.getHitCount() == 0 ? 1007 "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " + 1008 "cachingAccesses=" + stats.getRequestCachingCount() + ", " + 1009 "cachingHits=" + stats.getHitCachingCount() + ", " + 1010 "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ? 1011 "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) + 1012 "evictions=" + stats.getEvictionCount() + ", " + 1013 "evicted=" + stats.getEvictedCount() + ", " + 1014 "evictedPerRun=" + stats.evictedPerEviction()); 1015 } 1016 1017 /** 1018 * Get counter statistics for this cache. 1019 * 1020 * <p>Includes: total accesses, hits, misses, evicted blocks, and runs 1021 * of the eviction processes. 1022 */ 1023 @Override 1024 public CacheStats getStats() { 1025 return this.stats; 1026 } 1027 1028 public final static long CACHE_FIXED_OVERHEAD = 1029 ClassSize.estimateBase(LruBlockCache.class, false); 1030 1031 @Override 1032 public long heapSize() { 1033 return getCurrentSize(); 1034 } 1035 1036 private static long calculateOverhead(long maxSize, long blockSize, int concurrency) { 1037 // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG 1038 return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP 1039 + ((long) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY) 1040 + ((long) concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT); 1041 } 1042 1043 @Override 1044 public Iterator<CachedBlock> iterator() { 1045 final Iterator<LruCachedBlock> iterator = map.values().iterator(); 1046 1047 return new Iterator<CachedBlock>() { 1048 private final long now = System.nanoTime(); 1049 1050 @Override 1051 public boolean hasNext() { 1052 return iterator.hasNext(); 1053 } 1054 1055 @Override 1056 public CachedBlock next() { 1057 final LruCachedBlock b = iterator.next(); 1058 return new CachedBlock() { 1059 @Override 1060 public String toString() { 1061 return BlockCacheUtil.toString(this, now); 1062 } 1063 1064 @Override 1065 public BlockPriority getBlockPriority() { 1066 return b.getPriority(); 1067 } 1068 1069 @Override 1070 public BlockType getBlockType() { 1071 return b.getBuffer().getBlockType(); 1072 } 1073 1074 @Override 1075 public long getOffset() { 1076 return b.getCacheKey().getOffset(); 1077 } 1078 1079 @Override 1080 public long getSize() { 1081 return b.getBuffer().heapSize(); 1082 } 1083 1084 @Override 1085 public long getCachedTime() { 1086 return b.getCachedTime(); 1087 } 1088 1089 @Override 1090 public String getFilename() { 1091 return b.getCacheKey().getHfileName(); 1092 } 1093 1094 @Override 1095 public int compareTo(CachedBlock other) { 1096 int diff = this.getFilename().compareTo(other.getFilename()); 1097 if (diff != 0) { 1098 return diff; 1099 } 1100 diff = Long.compare(this.getOffset(), other.getOffset()); 1101 if (diff != 0) { 1102 return diff; 1103 } 1104 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) { 1105 throw new IllegalStateException(this.getCachedTime() + ", " + other.getCachedTime()); 1106 } 1107 return Long.compare(other.getCachedTime(), this.getCachedTime()); 1108 } 1109 1110 @Override 1111 public int hashCode() { 1112 return b.hashCode(); 1113 } 1114 1115 @Override 1116 public boolean equals(Object obj) { 1117 if (obj instanceof CachedBlock) { 1118 CachedBlock cb = (CachedBlock)obj; 1119 return compareTo(cb) == 0; 1120 } else { 1121 return false; 1122 } 1123 } 1124 }; 1125 } 1126 1127 @Override 1128 public void remove() { 1129 throw new UnsupportedOperationException(); 1130 } 1131 }; 1132 } 1133 1134 // Simple calculators of sizes given factors and maxSize 1135 1136 long acceptableSize() { 1137 return (long)Math.floor(this.maxSize * this.acceptableFactor); 1138 } 1139 private long minSize() { 1140 return (long)Math.floor(this.maxSize * this.minFactor); 1141 } 1142 private long singleSize() { 1143 return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor); 1144 } 1145 private long multiSize() { 1146 return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor); 1147 } 1148 private long memorySize() { 1149 return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor); 1150 } 1151 1152 @Override 1153 public void shutdown() { 1154 if (victimHandler != null) { 1155 victimHandler.shutdown(); 1156 } 1157 this.scheduleThreadPool.shutdown(); 1158 for (int i = 0; i < 10; i++) { 1159 if (!this.scheduleThreadPool.isShutdown()) { 1160 try { 1161 Thread.sleep(10); 1162 } catch (InterruptedException e) { 1163 LOG.warn("Interrupted while sleeping"); 1164 Thread.currentThread().interrupt(); 1165 break; 1166 } 1167 } 1168 } 1169 1170 if (!this.scheduleThreadPool.isShutdown()) { 1171 List<Runnable> runnables = this.scheduleThreadPool.shutdownNow(); 1172 LOG.debug("Still running " + runnables); 1173 } 1174 this.evictionThread.shutdown(); 1175 } 1176 1177 /** Clears the cache. Used in tests. */ 1178 public void clearCache() { 1179 this.map.clear(); 1180 this.elements.set(0); 1181 } 1182 1183 /** 1184 * Used in testing. May be very inefficient. 1185 * 1186 * @return the set of cached file names 1187 */ 1188 SortedSet<String> getCachedFileNamesForTest() { 1189 SortedSet<String> fileNames = new TreeSet<>(); 1190 for (BlockCacheKey cacheKey : map.keySet()) { 1191 fileNames.add(cacheKey.getHfileName()); 1192 } 1193 return fileNames; 1194 } 1195 1196 public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() { 1197 Map<DataBlockEncoding, Integer> counts = new EnumMap<>(DataBlockEncoding.class); 1198 for (LruCachedBlock block : map.values()) { 1199 DataBlockEncoding encoding = ((HFileBlock) block.getBuffer()).getDataBlockEncoding(); 1200 Integer count = counts.get(encoding); 1201 counts.put(encoding, (count == null ? 0 : count) + 1); 1202 } 1203 return counts; 1204 } 1205 1206 Map<BlockCacheKey, LruCachedBlock> getMapForTests() { 1207 return map; 1208 } 1209 1210 @Override 1211 public BlockCache[] getBlockCaches() { 1212 if (victimHandler != null) { 1213 return new BlockCache[] { this, this.victimHandler }; 1214 } 1215 return null; 1216 } 1217}