001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static java.util.Objects.requireNonNull; 021 022import java.lang.ref.WeakReference; 023import java.util.EnumMap; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Map; 027import java.util.PriorityQueue; 028import java.util.SortedSet; 029import java.util.TreeSet; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.Executors; 032import java.util.concurrent.ScheduledExecutorService; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicLong; 035import java.util.concurrent.atomic.LongAdder; 036import java.util.concurrent.locks.ReentrantLock; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.io.HeapSize; 039import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.ClassSize; 042import org.apache.hadoop.util.StringUtils; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 048import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 049import org.apache.hbase.thirdparty.com.google.common.base.Objects; 050import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 051 052/** 053 * A block cache implementation that is memory-aware using {@link HeapSize}, 054 * memory-bound using an LRU eviction algorithm, and concurrent: backed by a 055 * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving 056 * constant-time {@link #cacheBlock} and {@link #getBlock} operations.<p> 057 * 058 * Contains three levels of block priority to allow for scan-resistance and in-memory families 059 * {@link org.apache.hadoop.hbase.HColumnDescriptor#setInMemory(boolean)} (An in-memory column 060 * family is a column family that should be served from memory if possible): 061 * single-access, multiple-accesses, and in-memory priority. 062 * A block is added with an in-memory priority flag if 063 * {@link org.apache.hadoop.hbase.HColumnDescriptor#isInMemory()}, otherwise a block becomes a 064 * single access priority the first time it is read into this block cache. If a block is 065 * accessed again while in cache, it is marked as a multiple access priority block. This 066 * delineation of blocks is used to prevent scans from thrashing the cache adding a 067 * least-frequently-used element to the eviction algorithm.<p> 068 * 069 * Each priority is given its own chunk of the total cache to ensure 070 * fairness during eviction. Each priority will retain close to its maximum 071 * size, however, if any priority is not using its entire chunk the others 072 * are able to grow beyond their chunk size.<p> 073 * 074 * Instantiated at a minimum with the total size and average block size. 075 * All sizes are in bytes. The block size is not especially important as this 076 * cache is fully dynamic in its sizing of blocks. It is only used for 077 * pre-allocating data structures and in initial heap estimation of the map.<p> 078 * 079 * The detailed constructor defines the sizes for the three priorities (they 080 * should total to the <code>maximum size</code> defined). It also sets the levels that 081 * trigger and control the eviction thread.<p> 082 * 083 * The <code>acceptable size</code> is the cache size level which triggers the eviction 084 * process to start. It evicts enough blocks to get the size below the 085 * minimum size specified.<p> 086 * 087 * Eviction happens in a separate thread and involves a single full-scan 088 * of the map. It determines how many bytes must be freed to reach the minimum 089 * size, and then while scanning determines the fewest least-recently-used 090 * blocks necessary from each of the three priorities (would be 3 times bytes 091 * to free). It then uses the priority chunk sizes to evict fairly according 092 * to the relative sizes and usage. 093 */ 094@InterfaceAudience.Private 095public class LruBlockCache implements FirstLevelBlockCache { 096 097 private static final Logger LOG = LoggerFactory.getLogger(LruBlockCache.class); 098 099 /** 100 * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep 101 * evicting during an eviction run till the cache size is down to 80% of the total. 102 */ 103 private static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor"; 104 105 /** 106 * Acceptable size of cache (no evictions if size < acceptable) 107 */ 108 private static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME = 109 "hbase.lru.blockcache.acceptable.factor"; 110 111 /** 112 * Hard capacity limit of cache, will reject any put if size > this * acceptable 113 */ 114 static final String LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME = 115 "hbase.lru.blockcache.hard.capacity.limit.factor"; 116 private static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME = 117 "hbase.lru.blockcache.single.percentage"; 118 private static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME = 119 "hbase.lru.blockcache.multi.percentage"; 120 private static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME = 121 "hbase.lru.blockcache.memory.percentage"; 122 123 /** 124 * Configuration key to force data-block always (except in-memory are too much) 125 * cached in memory for in-memory hfile, unlike inMemory, which is a column-family 126 * configuration, inMemoryForceMode is a cluster-wide configuration 127 */ 128 private static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME = 129 "hbase.lru.rs.inmemoryforcemode"; 130 131 /* Default Configuration Parameters*/ 132 133 /* Backing Concurrent Map Configuration */ 134 static final float DEFAULT_LOAD_FACTOR = 0.75f; 135 static final int DEFAULT_CONCURRENCY_LEVEL = 16; 136 137 /* Eviction thresholds */ 138 private static final float DEFAULT_MIN_FACTOR = 0.95f; 139 static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f; 140 141 /* Priority buckets */ 142 private static final float DEFAULT_SINGLE_FACTOR = 0.25f; 143 private static final float DEFAULT_MULTI_FACTOR = 0.50f; 144 private static final float DEFAULT_MEMORY_FACTOR = 0.25f; 145 146 private static final float DEFAULT_HARD_CAPACITY_LIMIT_FACTOR = 1.2f; 147 148 private static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false; 149 150 /* Statistics thread */ 151 private static final int STAT_THREAD_PERIOD = 60 * 5; 152 private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size"; 153 private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L; 154 155 /** 156 * Defined the cache map as {@link ConcurrentHashMap} here, because in 157 * {@link LruBlockCache#getBlock}, we need to guarantee the atomicity of map#computeIfPresent 158 * (key, func). Besides, the func method must execute exactly once only when the key is present 159 * and under the lock context, otherwise the reference count will be messed up. Notice that the 160 * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that. 161 */ 162 private transient final ConcurrentHashMap<BlockCacheKey, LruCachedBlock> map; 163 164 /** Eviction lock (locked when eviction in process) */ 165 private transient final ReentrantLock evictionLock = new ReentrantLock(true); 166 167 private final long maxBlockSize; 168 169 /** Volatile boolean to track if we are in an eviction process or not */ 170 private volatile boolean evictionInProgress = false; 171 172 /** Eviction thread */ 173 private transient final EvictionThread evictionThread; 174 175 /** Statistics thread schedule pool (for heavy debugging, could remove) */ 176 private transient final ScheduledExecutorService scheduleThreadPool = 177 Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() 178 .setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build()); 179 180 /** Current size of cache */ 181 private final AtomicLong size; 182 183 /** Current size of data blocks */ 184 private final LongAdder dataBlockSize; 185 186 /** Current number of cached elements */ 187 private final AtomicLong elements; 188 189 /** Current number of cached data block elements */ 190 private final LongAdder dataBlockElements; 191 192 /** Cache access count (sequential ID) */ 193 private final AtomicLong count; 194 195 /** hard capacity limit */ 196 private float hardCapacityLimitFactor; 197 198 /** Cache statistics */ 199 private final CacheStats stats; 200 201 /** Maximum allowable size of cache (block put if size > max, evict) */ 202 private long maxSize; 203 204 /** Approximate block size */ 205 private long blockSize; 206 207 /** Acceptable size of cache (no evictions if size < acceptable) */ 208 private float acceptableFactor; 209 210 /** Minimum threshold of cache (when evicting, evict until size < min) */ 211 private float minFactor; 212 213 /** Single access bucket size */ 214 private float singleFactor; 215 216 /** Multiple access bucket size */ 217 private float multiFactor; 218 219 /** In-memory bucket size */ 220 private float memoryFactor; 221 222 /** Overhead of the structure itself */ 223 private long overhead; 224 225 /** Whether in-memory hfile's data block has higher priority when evicting */ 226 private boolean forceInMemory; 227 228 /** 229 * Where to send victims (blocks evicted/missing from the cache). This is used only when we use an 230 * external cache as L2. 231 * Note: See org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache 232 */ 233 private transient BlockCache victimHandler = null; 234 235 /** 236 * Default constructor. Specify maximum size and expected average block 237 * size (approximation is fine). 238 * 239 * <p>All other factors will be calculated based on defaults specified in 240 * this class. 241 * 242 * @param maxSize maximum size of cache, in bytes 243 * @param blockSize approximate size of each block, in bytes 244 */ 245 public LruBlockCache(long maxSize, long blockSize) { 246 this(maxSize, blockSize, true); 247 } 248 249 /** 250 * Constructor used for testing. Allows disabling of the eviction thread. 251 */ 252 public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) { 253 this(maxSize, blockSize, evictionThread, 254 (int) Math.ceil(1.2 * maxSize / blockSize), 255 DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL, 256 DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR, 257 DEFAULT_SINGLE_FACTOR, 258 DEFAULT_MULTI_FACTOR, 259 DEFAULT_MEMORY_FACTOR, 260 DEFAULT_HARD_CAPACITY_LIMIT_FACTOR, 261 false, 262 DEFAULT_MAX_BLOCK_SIZE); 263 } 264 265 public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) { 266 this(maxSize, blockSize, evictionThread, 267 (int) Math.ceil(1.2 * maxSize / blockSize), 268 DEFAULT_LOAD_FACTOR, 269 DEFAULT_CONCURRENCY_LEVEL, 270 conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR), 271 conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR), 272 conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR), 273 conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR), 274 conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR), 275 conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, 276 DEFAULT_HARD_CAPACITY_LIMIT_FACTOR), 277 conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE), 278 conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE)); 279 } 280 281 public LruBlockCache(long maxSize, long blockSize, Configuration conf) { 282 this(maxSize, blockSize, true, conf); 283 } 284 285 /** 286 * Configurable constructor. Use this constructor if not using defaults. 287 * 288 * @param maxSize maximum size of this cache, in bytes 289 * @param blockSize expected average size of blocks, in bytes 290 * @param evictionThread whether to run evictions in a bg thread or not 291 * @param mapInitialSize initial size of backing ConcurrentHashMap 292 * @param mapLoadFactor initial load factor of backing ConcurrentHashMap 293 * @param mapConcurrencyLevel initial concurrency factor for backing CHM 294 * @param minFactor percentage of total size that eviction will evict until 295 * @param acceptableFactor percentage of total size that triggers eviction 296 * @param singleFactor percentage of total size for single-access blocks 297 * @param multiFactor percentage of total size for multiple-access blocks 298 * @param memoryFactor percentage of total size for in-memory blocks 299 */ 300 public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, 301 int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel, 302 float minFactor, float acceptableFactor, float singleFactor, 303 float multiFactor, float memoryFactor, float hardLimitFactor, 304 boolean forceInMemory, long maxBlockSize) { 305 this.maxBlockSize = maxBlockSize; 306 if(singleFactor + multiFactor + memoryFactor != 1 || 307 singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) { 308 throw new IllegalArgumentException("Single, multi, and memory factors " + 309 " should be non-negative and total 1.0"); 310 } 311 if (minFactor >= acceptableFactor) { 312 throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor"); 313 } 314 if (minFactor >= 1.0f || acceptableFactor >= 1.0f) { 315 throw new IllegalArgumentException("all factors must be < 1"); 316 } 317 this.maxSize = maxSize; 318 this.blockSize = blockSize; 319 this.forceInMemory = forceInMemory; 320 map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel); 321 this.minFactor = minFactor; 322 this.acceptableFactor = acceptableFactor; 323 this.singleFactor = singleFactor; 324 this.multiFactor = multiFactor; 325 this.memoryFactor = memoryFactor; 326 this.stats = new CacheStats(this.getClass().getSimpleName()); 327 this.count = new AtomicLong(0); 328 this.elements = new AtomicLong(0); 329 this.dataBlockElements = new LongAdder(); 330 this.dataBlockSize = new LongAdder(); 331 this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel); 332 this.size = new AtomicLong(this.overhead); 333 this.hardCapacityLimitFactor = hardLimitFactor; 334 if (evictionThread) { 335 this.evictionThread = new EvictionThread(this); 336 this.evictionThread.start(); // FindBugs SC_START_IN_CTOR 337 } else { 338 this.evictionThread = null; 339 } 340 // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log 341 // every five minutes. 342 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD, 343 STAT_THREAD_PERIOD, TimeUnit.SECONDS); 344 } 345 346 @Override 347 public void setVictimCache(BlockCache victimCache) { 348 if (victimHandler != null) { 349 throw new IllegalArgumentException("The victim cache has already been set"); 350 } 351 victimHandler = requireNonNull(victimCache); 352 } 353 354 @Override 355 public void setMaxSize(long maxSize) { 356 this.maxSize = maxSize; 357 if (this.size.get() > acceptableSize() && !evictionInProgress) { 358 runEviction(); 359 } 360 } 361 362 /** 363 * The block cached in LRUBlockCache will always be an heap block: on the one side, the heap 364 * access will be more faster then off-heap, the small index block or meta block cached in 365 * CombinedBlockCache will benefit a lot. on other side, the LRUBlockCache size is always 366 * calculated based on the total heap size, if caching an off-heap block in LRUBlockCache, the 367 * heap size will be messed up. Here we will clone the block into an heap block if it's an 368 * off-heap block, otherwise just use the original block. The key point is maintain the refCnt of 369 * the block (HBASE-22127): <br> 370 * 1. if cache the cloned heap block, its refCnt is an totally new one, it's easy to handle; <br> 371 * 2. if cache the original heap block, we're sure that it won't be tracked in ByteBuffAllocator's 372 * reservoir, if both RPC and LRUBlockCache release the block, then it can be garbage collected by 373 * JVM, so need a retain here. 374 * @param buf the original block 375 * @return an block with an heap memory backend. 376 */ 377 private Cacheable asReferencedHeapBlock(Cacheable buf) { 378 if (buf instanceof HFileBlock) { 379 HFileBlock blk = ((HFileBlock) buf); 380 if (blk.isSharedMem()) { 381 return HFileBlock.deepCloneOnHeap(blk); 382 } 383 } 384 // The block will be referenced by this LRUBlockCache, so should increase its refCnt here. 385 return buf.retain(); 386 } 387 388 // BlockCache implementation 389 390 /** 391 * Cache the block with the specified name and buffer. 392 * <p> 393 * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547) 394 * this can happen, for which we compare the buffer contents. 395 * 396 * @param cacheKey block's cache key 397 * @param buf block buffer 398 * @param inMemory if block is in-memory 399 */ 400 @Override 401 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { 402 if (buf.heapSize() > maxBlockSize) { 403 // If there are a lot of blocks that are too 404 // big this can make the logs way too noisy. 405 // So we log 2% 406 if (stats.failInsert() % 50 == 0) { 407 LOG.warn("Trying to cache too large a block " 408 + cacheKey.getHfileName() + " @ " 409 + cacheKey.getOffset() 410 + " is " + buf.heapSize() 411 + " which is larger than " + maxBlockSize); 412 } 413 return; 414 } 415 416 LruCachedBlock cb = map.get(cacheKey); 417 if (cb != null && !BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, buf)) { 418 return; 419 } 420 long currentSize = size.get(); 421 long currentAcceptableSize = acceptableSize(); 422 long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize); 423 if (currentSize >= hardLimitSize) { 424 stats.failInsert(); 425 if (LOG.isTraceEnabled()) { 426 LOG.trace("LruBlockCache current size " + StringUtils.byteDesc(currentSize) 427 + " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + "." 428 + " The hard limit size is " + StringUtils.byteDesc(hardLimitSize) 429 + ", failed to put cacheKey:" + cacheKey + " into LruBlockCache."); 430 } 431 if (!evictionInProgress) { 432 runEviction(); 433 } 434 return; 435 } 436 // Ensure that the block is an heap one. 437 buf = asReferencedHeapBlock(buf); 438 cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory); 439 long newSize = updateSizeMetrics(cb, false); 440 map.put(cacheKey, cb); 441 long val = elements.incrementAndGet(); 442 if (buf.getBlockType().isData()) { 443 dataBlockElements.increment(); 444 } 445 if (LOG.isTraceEnabled()) { 446 long size = map.size(); 447 assertCounterSanity(size, val); 448 } 449 if (newSize > currentAcceptableSize && !evictionInProgress) { 450 runEviction(); 451 } 452 } 453 454 /** 455 * Sanity-checking for parity between actual block cache content and metrics. 456 * Intended only for use with TRACE level logging and -ea JVM. 457 */ 458 private static void assertCounterSanity(long mapSize, long counterVal) { 459 if (counterVal < 0) { 460 LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal + 461 ", mapSize=" + mapSize); 462 return; 463 } 464 if (mapSize < Integer.MAX_VALUE) { 465 double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.); 466 if (pct_diff > 0.05) { 467 LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal + 468 ", mapSize=" + mapSize); 469 } 470 } 471 } 472 473 /** 474 * Cache the block with the specified name and buffer. 475 * <p> 476 * TODO after HBASE-22005, we may cache an block which allocated from off-heap, but our LRU cache 477 * sizing is based on heap size, so we should handle this in HBASE-22127. It will introduce an 478 * switch whether make the LRU on-heap or not, if so we may need copy the memory to on-heap, 479 * otherwise the caching size is based on off-heap. 480 * @param cacheKey block's cache key 481 * @param buf block buffer 482 */ 483 @Override 484 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 485 cacheBlock(cacheKey, buf, false); 486 } 487 488 /** 489 * Helper function that updates the local size counter and also updates any 490 * per-cf or per-blocktype metrics it can discern from given 491 * {@link LruCachedBlock} 492 */ 493 private long updateSizeMetrics(LruCachedBlock cb, boolean evict) { 494 long heapsize = cb.heapSize(); 495 BlockType bt = cb.getBuffer().getBlockType(); 496 if (evict) { 497 heapsize *= -1; 498 } 499 if (bt != null && bt.isData()) { 500 dataBlockSize.add(heapsize); 501 } 502 return size.addAndGet(heapsize); 503 } 504 505 /** 506 * Get the buffer of the block with the specified name. 507 * 508 * @param cacheKey block's cache key 509 * @param caching true if the caller caches blocks on cache misses 510 * @param repeat Whether this is a repeat lookup for the same block 511 * (used to avoid double counting cache misses when doing double-check 512 * locking) 513 * @param updateCacheMetrics Whether to update cache metrics or not 514 * 515 * @return buffer of specified cache key, or null if not in cache 516 */ 517 @Override 518 public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, 519 boolean updateCacheMetrics) { 520 LruCachedBlock cb = map.computeIfPresent(cacheKey, (key, val) -> { 521 // It will be referenced by RPC path, so increase here. NOTICE: Must do the retain inside 522 // this block. because if retain outside the map#computeIfPresent, the evictBlock may remove 523 // the block and release, then we're retaining a block with refCnt=0 which is disallowed. 524 // see HBASE-22422. 525 val.getBuffer().retain(); 526 return val; 527 }); 528 if (cb == null) { 529 if (!repeat && updateCacheMetrics) { 530 stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 531 } 532 // If there is another block cache then try and read there. 533 // However if this is a retry ( second time in double checked locking ) 534 // And it's already a miss then the l2 will also be a miss. 535 if (victimHandler != null && !repeat) { 536 // The handler will increase result's refCnt for RPC, so need no extra retain. 537 Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics); 538 // Promote this to L1. 539 if (result != null) { 540 if (caching) { 541 cacheBlock(cacheKey, result, /* inMemory = */ false); 542 } 543 } 544 return result; 545 } 546 return null; 547 } 548 if (updateCacheMetrics) { 549 stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 550 } 551 cb.access(count.incrementAndGet()); 552 return cb.getBuffer(); 553 } 554 555 /** 556 * Whether the cache contains block with specified cacheKey 557 * 558 * @return true if contains the block 559 */ 560 @Override 561 public boolean containsBlock(BlockCacheKey cacheKey) { 562 return map.containsKey(cacheKey); 563 } 564 565 @Override 566 public boolean evictBlock(BlockCacheKey cacheKey) { 567 LruCachedBlock cb = map.get(cacheKey); 568 return cb != null && evictBlock(cb, false) > 0; 569 } 570 571 /** 572 * Evicts all blocks for a specific HFile. This is an 573 * expensive operation implemented as a linear-time search through all blocks 574 * in the cache. Ideally this should be a search in a log-access-time map. 575 * 576 * <p> 577 * This is used for evict-on-close to remove all blocks of a specific HFile. 578 * 579 * @return the number of blocks evicted 580 */ 581 @Override 582 public int evictBlocksByHfileName(String hfileName) { 583 int numEvicted = 0; 584 for (BlockCacheKey key : map.keySet()) { 585 if (key.getHfileName().equals(hfileName)) { 586 if (evictBlock(key)) 587 ++numEvicted; 588 } 589 } 590 if (victimHandler != null) { 591 numEvicted += victimHandler.evictBlocksByHfileName(hfileName); 592 } 593 return numEvicted; 594 } 595 596 /** 597 * Evict the block, and it will be cached by the victim handler if exists && 598 * block may be read again later 599 * 600 * @param evictedByEvictionProcess true if the given block is evicted by 601 * EvictionThread 602 * @return the heap size of evicted block 603 */ 604 protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) { 605 LruCachedBlock previous = map.remove(block.getCacheKey()); 606 if (previous == null) { 607 return 0; 608 } 609 updateSizeMetrics(block, true); 610 long val = elements.decrementAndGet(); 611 if (LOG.isTraceEnabled()) { 612 long size = map.size(); 613 assertCounterSanity(size, val); 614 } 615 if (block.getBuffer().getBlockType().isData()) { 616 dataBlockElements.decrement(); 617 } 618 if (evictedByEvictionProcess) { 619 // When the eviction of the block happened because of invalidation of HFiles, no need to 620 // update the stats counter. 621 stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary()); 622 if (victimHandler != null) { 623 victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer()); 624 } 625 } 626 // Decrease the block's reference count, and if refCount is 0, then it'll auto-deallocate. DO 627 // NOT move this up because if do that then the victimHandler may access the buffer with 628 // refCnt = 0 which is disallowed. 629 previous.getBuffer().release(); 630 return block.heapSize(); 631 } 632 633 /** 634 * Multi-threaded call to run the eviction process. 635 */ 636 private void runEviction() { 637 if (evictionThread == null) { 638 evict(); 639 } else { 640 evictionThread.evict(); 641 } 642 } 643 644 @VisibleForTesting 645 boolean isEvictionInProgress() { 646 return evictionInProgress; 647 } 648 649 @VisibleForTesting 650 long getOverhead() { 651 return overhead; 652 } 653 654 /** 655 * Eviction method. 656 */ 657 void evict() { 658 659 // Ensure only one eviction at a time 660 if(!evictionLock.tryLock()) return; 661 662 try { 663 evictionInProgress = true; 664 long currentSize = this.size.get(); 665 long bytesToFree = currentSize - minSize(); 666 667 if (LOG.isTraceEnabled()) { 668 LOG.trace("Block cache LRU eviction started; Attempting to free " + 669 StringUtils.byteDesc(bytesToFree) + " of total=" + 670 StringUtils.byteDesc(currentSize)); 671 } 672 673 if (bytesToFree <= 0) return; 674 675 // Instantiate priority buckets 676 BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize, singleSize()); 677 BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize, multiSize()); 678 BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize, memorySize()); 679 680 // Scan entire map putting into appropriate buckets 681 for (LruCachedBlock cachedBlock : map.values()) { 682 switch (cachedBlock.getPriority()) { 683 case SINGLE: { 684 bucketSingle.add(cachedBlock); 685 break; 686 } 687 case MULTI: { 688 bucketMulti.add(cachedBlock); 689 break; 690 } 691 case MEMORY: { 692 bucketMemory.add(cachedBlock); 693 break; 694 } 695 } 696 } 697 698 long bytesFreed = 0; 699 if (forceInMemory || memoryFactor > 0.999f) { 700 long s = bucketSingle.totalSize(); 701 long m = bucketMulti.totalSize(); 702 if (bytesToFree > (s + m)) { 703 // this means we need to evict blocks in memory bucket to make room, 704 // so the single and multi buckets will be emptied 705 bytesFreed = bucketSingle.free(s); 706 bytesFreed += bucketMulti.free(m); 707 if (LOG.isTraceEnabled()) { 708 LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) + 709 " from single and multi buckets"); 710 } 711 bytesFreed += bucketMemory.free(bytesToFree - bytesFreed); 712 if (LOG.isTraceEnabled()) { 713 LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) + 714 " total from all three buckets "); 715 } 716 } else { 717 // this means no need to evict block in memory bucket, 718 // and we try best to make the ratio between single-bucket and 719 // multi-bucket is 1:2 720 long bytesRemain = s + m - bytesToFree; 721 if (3 * s <= bytesRemain) { 722 // single-bucket is small enough that no eviction happens for it 723 // hence all eviction goes from multi-bucket 724 bytesFreed = bucketMulti.free(bytesToFree); 725 } else if (3 * m <= 2 * bytesRemain) { 726 // multi-bucket is small enough that no eviction happens for it 727 // hence all eviction goes from single-bucket 728 bytesFreed = bucketSingle.free(bytesToFree); 729 } else { 730 // both buckets need to evict some blocks 731 bytesFreed = bucketSingle.free(s - bytesRemain / 3); 732 if (bytesFreed < bytesToFree) { 733 bytesFreed += bucketMulti.free(bytesToFree - bytesFreed); 734 } 735 } 736 } 737 } else { 738 PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3); 739 740 bucketQueue.add(bucketSingle); 741 bucketQueue.add(bucketMulti); 742 bucketQueue.add(bucketMemory); 743 744 int remainingBuckets = bucketQueue.size(); 745 746 BlockBucket bucket; 747 while ((bucket = bucketQueue.poll()) != null) { 748 long overflow = bucket.overflow(); 749 if (overflow > 0) { 750 long bucketBytesToFree = 751 Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets); 752 bytesFreed += bucket.free(bucketBytesToFree); 753 } 754 remainingBuckets--; 755 } 756 } 757 if (LOG.isTraceEnabled()) { 758 long single = bucketSingle.totalSize(); 759 long multi = bucketMulti.totalSize(); 760 long memory = bucketMemory.totalSize(); 761 LOG.trace("Block cache LRU eviction completed; " + 762 "freed=" + StringUtils.byteDesc(bytesFreed) + ", " + 763 "total=" + StringUtils.byteDesc(this.size.get()) + ", " + 764 "single=" + StringUtils.byteDesc(single) + ", " + 765 "multi=" + StringUtils.byteDesc(multi) + ", " + 766 "memory=" + StringUtils.byteDesc(memory)); 767 } 768 } finally { 769 stats.evict(); 770 evictionInProgress = false; 771 evictionLock.unlock(); 772 } 773 } 774 775 @Override 776 public String toString() { 777 return MoreObjects.toStringHelper(this) 778 .add("blockCount", getBlockCount()) 779 .add("currentSize", StringUtils.byteDesc(getCurrentSize())) 780 .add("freeSize", StringUtils.byteDesc(getFreeSize())) 781 .add("maxSize", StringUtils.byteDesc(getMaxSize())) 782 .add("heapSize", StringUtils.byteDesc(heapSize())) 783 .add("minSize", StringUtils.byteDesc(minSize())) 784 .add("minFactor", minFactor) 785 .add("multiSize", StringUtils.byteDesc(multiSize())) 786 .add("multiFactor", multiFactor) 787 .add("singleSize", StringUtils.byteDesc(singleSize())) 788 .add("singleFactor", singleFactor) 789 .toString(); 790 } 791 792 /** 793 * Used to group blocks into priority buckets. There will be a BlockBucket 794 * for each priority (single, multi, memory). Once bucketed, the eviction 795 * algorithm takes the appropriate number of elements out of each according 796 * to configuration parameters and their relatives sizes. 797 */ 798 private class BlockBucket implements Comparable<BlockBucket> { 799 800 private final String name; 801 private LruCachedBlockQueue queue; 802 private long totalSize = 0; 803 private long bucketSize; 804 805 public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) { 806 this.name = name; 807 this.bucketSize = bucketSize; 808 queue = new LruCachedBlockQueue(bytesToFree, blockSize); 809 totalSize = 0; 810 } 811 812 public void add(LruCachedBlock block) { 813 totalSize += block.heapSize(); 814 queue.add(block); 815 } 816 817 public long free(long toFree) { 818 if (LOG.isTraceEnabled()) { 819 LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this); 820 } 821 LruCachedBlock cb; 822 long freedBytes = 0; 823 while ((cb = queue.pollLast()) != null) { 824 freedBytes += evictBlock(cb, true); 825 if (freedBytes >= toFree) { 826 return freedBytes; 827 } 828 } 829 if (LOG.isTraceEnabled()) { 830 LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this); 831 } 832 return freedBytes; 833 } 834 835 public long overflow() { 836 return totalSize - bucketSize; 837 } 838 839 public long totalSize() { 840 return totalSize; 841 } 842 843 @Override 844 public int compareTo(BlockBucket that) { 845 return Long.compare(this.overflow(), that.overflow()); 846 } 847 848 @Override 849 public boolean equals(Object that) { 850 if (that == null || !(that instanceof BlockBucket)) { 851 return false; 852 } 853 return compareTo((BlockBucket)that) == 0; 854 } 855 856 @Override 857 public int hashCode() { 858 return Objects.hashCode(name, bucketSize, queue, totalSize); 859 } 860 861 @Override 862 public String toString() { 863 return MoreObjects.toStringHelper(this) 864 .add("name", name) 865 .add("totalSize", StringUtils.byteDesc(totalSize)) 866 .add("bucketSize", StringUtils.byteDesc(bucketSize)) 867 .toString(); 868 } 869 } 870 871 /** 872 * Get the maximum size of this cache. 873 * 874 * @return max size in bytes 875 */ 876 877 @Override 878 public long getMaxSize() { 879 return this.maxSize; 880 } 881 882 @Override 883 public long getCurrentSize() { 884 return this.size.get(); 885 } 886 887 @Override 888 public long getCurrentDataSize() { 889 return this.dataBlockSize.sum(); 890 } 891 892 @Override 893 public long getFreeSize() { 894 return getMaxSize() - getCurrentSize(); 895 } 896 897 @Override 898 public long size() { 899 return getMaxSize(); 900 } 901 902 @Override 903 public long getBlockCount() { 904 return this.elements.get(); 905 } 906 907 @Override 908 public long getDataBlockCount() { 909 return this.dataBlockElements.sum(); 910 } 911 912 EvictionThread getEvictionThread() { 913 return this.evictionThread; 914 } 915 916 /* 917 * Eviction thread. Sits in waiting state until an eviction is triggered 918 * when the cache size grows above the acceptable level.<p> 919 * 920 * Thread is triggered into action by {@link LruBlockCache#runEviction()} 921 */ 922 static class EvictionThread extends Thread { 923 924 private WeakReference<LruBlockCache> cache; 925 private volatile boolean go = true; 926 // flag set after enter the run method, used for test 927 private boolean enteringRun = false; 928 929 public EvictionThread(LruBlockCache cache) { 930 super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread"); 931 setDaemon(true); 932 this.cache = new WeakReference<>(cache); 933 } 934 935 @Override 936 public void run() { 937 enteringRun = true; 938 while (this.go) { 939 synchronized (this) { 940 try { 941 this.wait(1000 * 10/*Don't wait for ever*/); 942 } catch (InterruptedException e) { 943 LOG.warn("Interrupted eviction thread ", e); 944 Thread.currentThread().interrupt(); 945 } 946 } 947 LruBlockCache cache = this.cache.get(); 948 if (cache == null) break; 949 cache.evict(); 950 } 951 } 952 953 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY", 954 justification="This is what we want") 955 public void evict() { 956 synchronized (this) { 957 this.notifyAll(); 958 } 959 } 960 961 synchronized void shutdown() { 962 this.go = false; 963 this.notifyAll(); 964 } 965 966 /** 967 * Used for the test. 968 */ 969 boolean isEnteringRun() { 970 return this.enteringRun; 971 } 972 } 973 974 /* 975 * Statistics thread. Periodically prints the cache statistics to the log. 976 */ 977 static class StatisticsThread extends Thread { 978 979 private final LruBlockCache lru; 980 981 public StatisticsThread(LruBlockCache lru) { 982 super("LruBlockCacheStats"); 983 setDaemon(true); 984 this.lru = lru; 985 } 986 987 @Override 988 public void run() { 989 lru.logStats(); 990 } 991 } 992 993 public void logStats() { 994 // Log size 995 long totalSize = heapSize(); 996 long freeSize = maxSize - totalSize; 997 LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " + 998 "freeSize=" + StringUtils.byteDesc(freeSize) + ", " + 999 "max=" + StringUtils.byteDesc(this.maxSize) + ", " + 1000 "blockCount=" + getBlockCount() + ", " + 1001 "accesses=" + stats.getRequestCount() + ", " + 1002 "hits=" + stats.getHitCount() + ", " + 1003 "hitRatio=" + (stats.getHitCount() == 0 ? 1004 "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " + 1005 "cachingAccesses=" + stats.getRequestCachingCount() + ", " + 1006 "cachingHits=" + stats.getHitCachingCount() + ", " + 1007 "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ? 1008 "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) + 1009 "evictions=" + stats.getEvictionCount() + ", " + 1010 "evicted=" + stats.getEvictedCount() + ", " + 1011 "evictedPerRun=" + stats.evictedPerEviction()); 1012 } 1013 1014 /** 1015 * Get counter statistics for this cache. 1016 * 1017 * <p>Includes: total accesses, hits, misses, evicted blocks, and runs 1018 * of the eviction processes. 1019 */ 1020 @Override 1021 public CacheStats getStats() { 1022 return this.stats; 1023 } 1024 1025 public final static long CACHE_FIXED_OVERHEAD = ClassSize.align( 1026 (4 * Bytes.SIZEOF_LONG) + (11 * ClassSize.REFERENCE) + 1027 (6 * Bytes.SIZEOF_FLOAT) + (2 * Bytes.SIZEOF_BOOLEAN) 1028 + ClassSize.OBJECT); 1029 1030 @Override 1031 public long heapSize() { 1032 return getCurrentSize(); 1033 } 1034 1035 private static long calculateOverhead(long maxSize, long blockSize, int concurrency) { 1036 // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG 1037 return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP 1038 + ((long) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY) 1039 + ((long) concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT); 1040 } 1041 1042 @Override 1043 public Iterator<CachedBlock> iterator() { 1044 final Iterator<LruCachedBlock> iterator = map.values().iterator(); 1045 1046 return new Iterator<CachedBlock>() { 1047 private final long now = System.nanoTime(); 1048 1049 @Override 1050 public boolean hasNext() { 1051 return iterator.hasNext(); 1052 } 1053 1054 @Override 1055 public CachedBlock next() { 1056 final LruCachedBlock b = iterator.next(); 1057 return new CachedBlock() { 1058 @Override 1059 public String toString() { 1060 return BlockCacheUtil.toString(this, now); 1061 } 1062 1063 @Override 1064 public BlockPriority getBlockPriority() { 1065 return b.getPriority(); 1066 } 1067 1068 @Override 1069 public BlockType getBlockType() { 1070 return b.getBuffer().getBlockType(); 1071 } 1072 1073 @Override 1074 public long getOffset() { 1075 return b.getCacheKey().getOffset(); 1076 } 1077 1078 @Override 1079 public long getSize() { 1080 return b.getBuffer().heapSize(); 1081 } 1082 1083 @Override 1084 public long getCachedTime() { 1085 return b.getCachedTime(); 1086 } 1087 1088 @Override 1089 public String getFilename() { 1090 return b.getCacheKey().getHfileName(); 1091 } 1092 1093 @Override 1094 public int compareTo(CachedBlock other) { 1095 int diff = this.getFilename().compareTo(other.getFilename()); 1096 if (diff != 0) return diff; 1097 diff = Long.compare(this.getOffset(), other.getOffset()); 1098 if (diff != 0) return diff; 1099 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) { 1100 throw new IllegalStateException(this.getCachedTime() + ", " + other.getCachedTime()); 1101 } 1102 return Long.compare(other.getCachedTime(), this.getCachedTime()); 1103 } 1104 1105 @Override 1106 public int hashCode() { 1107 return b.hashCode(); 1108 } 1109 1110 @Override 1111 public boolean equals(Object obj) { 1112 if (obj instanceof CachedBlock) { 1113 CachedBlock cb = (CachedBlock)obj; 1114 return compareTo(cb) == 0; 1115 } else { 1116 return false; 1117 } 1118 } 1119 }; 1120 } 1121 1122 @Override 1123 public void remove() { 1124 throw new UnsupportedOperationException(); 1125 } 1126 }; 1127 } 1128 1129 // Simple calculators of sizes given factors and maxSize 1130 1131 long acceptableSize() { 1132 return (long)Math.floor(this.maxSize * this.acceptableFactor); 1133 } 1134 private long minSize() { 1135 return (long)Math.floor(this.maxSize * this.minFactor); 1136 } 1137 private long singleSize() { 1138 return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor); 1139 } 1140 private long multiSize() { 1141 return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor); 1142 } 1143 private long memorySize() { 1144 return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor); 1145 } 1146 1147 @Override 1148 public void shutdown() { 1149 if (victimHandler != null) { 1150 victimHandler.shutdown(); 1151 } 1152 this.scheduleThreadPool.shutdown(); 1153 for (int i = 0; i < 10; i++) { 1154 if (!this.scheduleThreadPool.isShutdown()) { 1155 try { 1156 Thread.sleep(10); 1157 } catch (InterruptedException e) { 1158 LOG.warn("Interrupted while sleeping"); 1159 Thread.currentThread().interrupt(); 1160 break; 1161 } 1162 } 1163 } 1164 1165 if (!this.scheduleThreadPool.isShutdown()) { 1166 List<Runnable> runnables = this.scheduleThreadPool.shutdownNow(); 1167 LOG.debug("Still running " + runnables); 1168 } 1169 this.evictionThread.shutdown(); 1170 } 1171 1172 /** Clears the cache. Used in tests. */ 1173 @VisibleForTesting 1174 public void clearCache() { 1175 this.map.clear(); 1176 this.elements.set(0); 1177 } 1178 1179 /** 1180 * Used in testing. May be very inefficient. 1181 * 1182 * @return the set of cached file names 1183 */ 1184 @VisibleForTesting 1185 SortedSet<String> getCachedFileNamesForTest() { 1186 SortedSet<String> fileNames = new TreeSet<>(); 1187 for (BlockCacheKey cacheKey : map.keySet()) { 1188 fileNames.add(cacheKey.getHfileName()); 1189 } 1190 return fileNames; 1191 } 1192 1193 @VisibleForTesting 1194 public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() { 1195 Map<DataBlockEncoding, Integer> counts = new EnumMap<>(DataBlockEncoding.class); 1196 for (LruCachedBlock block : map.values()) { 1197 DataBlockEncoding encoding = ((HFileBlock) block.getBuffer()).getDataBlockEncoding(); 1198 Integer count = counts.get(encoding); 1199 counts.put(encoding, (count == null ? 0 : count) + 1); 1200 } 1201 return counts; 1202 } 1203 1204 @VisibleForTesting 1205 Map<BlockCacheKey, LruCachedBlock> getMapForTests() { 1206 return map; 1207 } 1208 1209 @Override 1210 public BlockCache[] getBlockCaches() { 1211 if (victimHandler != null) { 1212 return new BlockCache[] { this, this.victimHandler }; 1213 } 1214 return null; 1215 } 1216}