001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import java.lang.ref.WeakReference; 021import java.util.EnumMap; 022import java.util.Iterator; 023import java.util.List; 024import java.util.Map; 025import java.util.PriorityQueue; 026import java.util.SortedSet; 027import java.util.TreeSet; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.Executors; 030import java.util.concurrent.ScheduledExecutorService; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.atomic.AtomicLong; 033import java.util.concurrent.atomic.LongAdder; 034import java.util.concurrent.locks.ReentrantLock; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.io.HeapSize; 037import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.hbase.util.ClassSize; 040import org.apache.hadoop.hbase.util.HasThread; 041import org.apache.hadoop.util.StringUtils; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 047import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 048import org.apache.hbase.thirdparty.com.google.common.base.Objects; 049import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 050 051/** 052 * A block cache implementation that is memory-aware using {@link HeapSize}, 053 * memory-bound using an LRU eviction algorithm, and concurrent: backed by a 054 * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving 055 * constant-time {@link #cacheBlock} and {@link #getBlock} operations.<p> 056 * 057 * Contains three levels of block priority to allow for scan-resistance and in-memory families 058 * {@link org.apache.hadoop.hbase.HColumnDescriptor#setInMemory(boolean)} (An in-memory column 059 * family is a column family that should be served from memory if possible): 060 * single-access, multiple-accesses, and in-memory priority. 061 * A block is added with an in-memory priority flag if 062 * {@link org.apache.hadoop.hbase.HColumnDescriptor#isInMemory()}, otherwise a block becomes a 063 * single access priority the first time it is read into this block cache. If a block is 064 * accessed again while in cache, it is marked as a multiple access priority block. This 065 * delineation of blocks is used to prevent scans from thrashing the cache adding a 066 * least-frequently-used element to the eviction algorithm.<p> 067 * 068 * Each priority is given its own chunk of the total cache to ensure 069 * fairness during eviction. Each priority will retain close to its maximum 070 * size, however, if any priority is not using its entire chunk the others 071 * are able to grow beyond their chunk size.<p> 072 * 073 * Instantiated at a minimum with the total size and average block size. 074 * All sizes are in bytes. The block size is not especially important as this 075 * cache is fully dynamic in its sizing of blocks. It is only used for 076 * pre-allocating data structures and in initial heap estimation of the map.<p> 077 * 078 * The detailed constructor defines the sizes for the three priorities (they 079 * should total to the <code>maximum size</code> defined). It also sets the levels that 080 * trigger and control the eviction thread.<p> 081 * 082 * The <code>acceptable size</code> is the cache size level which triggers the eviction 083 * process to start. It evicts enough blocks to get the size below the 084 * minimum size specified.<p> 085 * 086 * Eviction happens in a separate thread and involves a single full-scan 087 * of the map. It determines how many bytes must be freed to reach the minimum 088 * size, and then while scanning determines the fewest least-recently-used 089 * blocks necessary from each of the three priorities (would be 3 times bytes 090 * to free). It then uses the priority chunk sizes to evict fairly according 091 * to the relative sizes and usage. 092 */ 093@InterfaceAudience.Private 094public class LruBlockCache implements ResizableBlockCache, HeapSize { 095 096 private static final Logger LOG = LoggerFactory.getLogger(LruBlockCache.class); 097 098 /** 099 * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep 100 * evicting during an eviction run till the cache size is down to 80% of the total. 101 */ 102 private static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor"; 103 104 /** 105 * Acceptable size of cache (no evictions if size < acceptable) 106 */ 107 private static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME = 108 "hbase.lru.blockcache.acceptable.factor"; 109 110 /** 111 * Hard capacity limit of cache, will reject any put if size > this * acceptable 112 */ 113 static final String LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME = 114 "hbase.lru.blockcache.hard.capacity.limit.factor"; 115 private static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME = 116 "hbase.lru.blockcache.single.percentage"; 117 private static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME = 118 "hbase.lru.blockcache.multi.percentage"; 119 private static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME = 120 "hbase.lru.blockcache.memory.percentage"; 121 122 /** 123 * Configuration key to force data-block always (except in-memory are too much) 124 * cached in memory for in-memory hfile, unlike inMemory, which is a column-family 125 * configuration, inMemoryForceMode is a cluster-wide configuration 126 */ 127 private static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME = 128 "hbase.lru.rs.inmemoryforcemode"; 129 130 /* Default Configuration Parameters*/ 131 132 /* Backing Concurrent Map Configuration */ 133 static final float DEFAULT_LOAD_FACTOR = 0.75f; 134 static final int DEFAULT_CONCURRENCY_LEVEL = 16; 135 136 /* Eviction thresholds */ 137 private static final float DEFAULT_MIN_FACTOR = 0.95f; 138 static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f; 139 140 /* Priority buckets */ 141 private static final float DEFAULT_SINGLE_FACTOR = 0.25f; 142 private static final float DEFAULT_MULTI_FACTOR = 0.50f; 143 private static final float DEFAULT_MEMORY_FACTOR = 0.25f; 144 145 private static final float DEFAULT_HARD_CAPACITY_LIMIT_FACTOR = 1.2f; 146 147 private static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false; 148 149 /* Statistics thread */ 150 private static final int STAT_THREAD_PERIOD = 60 * 5; 151 private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size"; 152 private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L; 153 154 /** Concurrent map (the cache) */ 155 private transient final Map<BlockCacheKey, LruCachedBlock> map; 156 157 /** Eviction lock (locked when eviction in process) */ 158 private transient final ReentrantLock evictionLock = new ReentrantLock(true); 159 160 private final long maxBlockSize; 161 162 /** Volatile boolean to track if we are in an eviction process or not */ 163 private volatile boolean evictionInProgress = false; 164 165 /** Eviction thread */ 166 private transient final EvictionThread evictionThread; 167 168 /** Statistics thread schedule pool (for heavy debugging, could remove) */ 169 private transient final ScheduledExecutorService scheduleThreadPool = 170 Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() 171 .setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build()); 172 173 /** Current size of cache */ 174 private final AtomicLong size; 175 176 /** Current size of data blocks */ 177 private final LongAdder dataBlockSize; 178 179 /** Current number of cached elements */ 180 private final AtomicLong elements; 181 182 /** Current number of cached data block elements */ 183 private final LongAdder dataBlockElements; 184 185 /** Cache access count (sequential ID) */ 186 private final AtomicLong count; 187 188 /** hard capacity limit */ 189 private float hardCapacityLimitFactor; 190 191 /** Cache statistics */ 192 private final CacheStats stats; 193 194 /** Maximum allowable size of cache (block put if size > max, evict) */ 195 private long maxSize; 196 197 /** Approximate block size */ 198 private long blockSize; 199 200 /** Acceptable size of cache (no evictions if size < acceptable) */ 201 private float acceptableFactor; 202 203 /** Minimum threshold of cache (when evicting, evict until size < min) */ 204 private float minFactor; 205 206 /** Single access bucket size */ 207 private float singleFactor; 208 209 /** Multiple access bucket size */ 210 private float multiFactor; 211 212 /** In-memory bucket size */ 213 private float memoryFactor; 214 215 /** Overhead of the structure itself */ 216 private long overhead; 217 218 /** Whether in-memory hfile's data block has higher priority when evicting */ 219 private boolean forceInMemory; 220 221 /** 222 * Where to send victims (blocks evicted/missing from the cache). This is used only when we use an 223 * external cache as L2. 224 * Note: See org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache 225 */ 226 private transient BlockCache victimHandler = null; 227 228 /** 229 * Default constructor. Specify maximum size and expected average block 230 * size (approximation is fine). 231 * 232 * <p>All other factors will be calculated based on defaults specified in 233 * this class. 234 * 235 * @param maxSize maximum size of cache, in bytes 236 * @param blockSize approximate size of each block, in bytes 237 */ 238 public LruBlockCache(long maxSize, long blockSize) { 239 this(maxSize, blockSize, true); 240 } 241 242 /** 243 * Constructor used for testing. Allows disabling of the eviction thread. 244 */ 245 public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) { 246 this(maxSize, blockSize, evictionThread, 247 (int) Math.ceil(1.2 * maxSize / blockSize), 248 DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL, 249 DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR, 250 DEFAULT_SINGLE_FACTOR, 251 DEFAULT_MULTI_FACTOR, 252 DEFAULT_MEMORY_FACTOR, 253 DEFAULT_HARD_CAPACITY_LIMIT_FACTOR, 254 false, 255 DEFAULT_MAX_BLOCK_SIZE 256 ); 257 } 258 259 public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) { 260 this(maxSize, blockSize, evictionThread, 261 (int) Math.ceil(1.2 * maxSize / blockSize), 262 DEFAULT_LOAD_FACTOR, 263 DEFAULT_CONCURRENCY_LEVEL, 264 conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR), 265 conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR), 266 conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR), 267 conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR), 268 conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR), 269 conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, 270 DEFAULT_HARD_CAPACITY_LIMIT_FACTOR), 271 conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE), 272 conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE) 273 ); 274 } 275 276 public LruBlockCache(long maxSize, long blockSize, Configuration conf) { 277 this(maxSize, blockSize, true, conf); 278 } 279 280 /** 281 * Configurable constructor. Use this constructor if not using defaults. 282 * 283 * @param maxSize maximum size of this cache, in bytes 284 * @param blockSize expected average size of blocks, in bytes 285 * @param evictionThread whether to run evictions in a bg thread or not 286 * @param mapInitialSize initial size of backing ConcurrentHashMap 287 * @param mapLoadFactor initial load factor of backing ConcurrentHashMap 288 * @param mapConcurrencyLevel initial concurrency factor for backing CHM 289 * @param minFactor percentage of total size that eviction will evict until 290 * @param acceptableFactor percentage of total size that triggers eviction 291 * @param singleFactor percentage of total size for single-access blocks 292 * @param multiFactor percentage of total size for multiple-access blocks 293 * @param memoryFactor percentage of total size for in-memory blocks 294 */ 295 public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, 296 int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel, 297 float minFactor, float acceptableFactor, float singleFactor, 298 float multiFactor, float memoryFactor, float hardLimitFactor, 299 boolean forceInMemory, long maxBlockSize) { 300 this.maxBlockSize = maxBlockSize; 301 if(singleFactor + multiFactor + memoryFactor != 1 || 302 singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) { 303 throw new IllegalArgumentException("Single, multi, and memory factors " + 304 " should be non-negative and total 1.0"); 305 } 306 if (minFactor >= acceptableFactor) { 307 throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor"); 308 } 309 if (minFactor >= 1.0f || acceptableFactor >= 1.0f) { 310 throw new IllegalArgumentException("all factors must be < 1"); 311 } 312 this.maxSize = maxSize; 313 this.blockSize = blockSize; 314 this.forceInMemory = forceInMemory; 315 map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel); 316 this.minFactor = minFactor; 317 this.acceptableFactor = acceptableFactor; 318 this.singleFactor = singleFactor; 319 this.multiFactor = multiFactor; 320 this.memoryFactor = memoryFactor; 321 this.stats = new CacheStats(this.getClass().getSimpleName()); 322 this.count = new AtomicLong(0); 323 this.elements = new AtomicLong(0); 324 this.dataBlockElements = new LongAdder(); 325 this.dataBlockSize = new LongAdder(); 326 this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel); 327 this.size = new AtomicLong(this.overhead); 328 this.hardCapacityLimitFactor = hardLimitFactor; 329 if (evictionThread) { 330 this.evictionThread = new EvictionThread(this); 331 this.evictionThread.start(); // FindBugs SC_START_IN_CTOR 332 } else { 333 this.evictionThread = null; 334 } 335 // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log 336 // every five minutes. 337 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD, 338 STAT_THREAD_PERIOD, TimeUnit.SECONDS); 339 } 340 341 @Override 342 public void setMaxSize(long maxSize) { 343 this.maxSize = maxSize; 344 if (this.size.get() > acceptableSize() && !evictionInProgress) { 345 runEviction(); 346 } 347 } 348 349 // BlockCache implementation 350 351 /** 352 * Cache the block with the specified name and buffer. 353 * <p> 354 * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547) 355 * this can happen, for which we compare the buffer contents. 356 * 357 * @param cacheKey block's cache key 358 * @param buf block buffer 359 * @param inMemory if block is in-memory 360 */ 361 @Override 362 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { 363 if (buf.heapSize() > maxBlockSize) { 364 // If there are a lot of blocks that are too 365 // big this can make the logs way too noisy. 366 // So we log 2% 367 if (stats.failInsert() % 50 == 0) { 368 LOG.warn("Trying to cache too large a block " 369 + cacheKey.getHfileName() + " @ " 370 + cacheKey.getOffset() 371 + " is " + buf.heapSize() 372 + " which is larger than " + maxBlockSize); 373 } 374 return; 375 } 376 377 LruCachedBlock cb = map.get(cacheKey); 378 if (cb != null && !BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, buf)) { 379 return; 380 } 381 long currentSize = size.get(); 382 long currentAcceptableSize = acceptableSize(); 383 long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize); 384 if (currentSize >= hardLimitSize) { 385 stats.failInsert(); 386 if (LOG.isTraceEnabled()) { 387 LOG.trace("LruBlockCache current size " + StringUtils.byteDesc(currentSize) 388 + " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + "." 389 + " The hard limit size is " + StringUtils.byteDesc(hardLimitSize) 390 + ", failed to put cacheKey:" + cacheKey + " into LruBlockCache."); 391 } 392 if (!evictionInProgress) { 393 runEviction(); 394 } 395 return; 396 } 397 cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory); 398 long newSize = updateSizeMetrics(cb, false); 399 map.put(cacheKey, cb); 400 long val = elements.incrementAndGet(); 401 if (buf.getBlockType().isData()) { 402 dataBlockElements.increment(); 403 } 404 if (LOG.isTraceEnabled()) { 405 long size = map.size(); 406 assertCounterSanity(size, val); 407 } 408 if (newSize > currentAcceptableSize && !evictionInProgress) { 409 runEviction(); 410 } 411 } 412 413 /** 414 * Sanity-checking for parity between actual block cache content and metrics. 415 * Intended only for use with TRACE level logging and -ea JVM. 416 */ 417 private static void assertCounterSanity(long mapSize, long counterVal) { 418 if (counterVal < 0) { 419 LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal + 420 ", mapSize=" + mapSize); 421 return; 422 } 423 if (mapSize < Integer.MAX_VALUE) { 424 double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.); 425 if (pct_diff > 0.05) { 426 LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal + 427 ", mapSize=" + mapSize); 428 } 429 } 430 } 431 432 /** 433 * Cache the block with the specified name and buffer. 434 * <p> 435 * 436 * @param cacheKey block's cache key 437 * @param buf block buffer 438 */ 439 @Override 440 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 441 cacheBlock(cacheKey, buf, false); 442 } 443 444 /** 445 * Helper function that updates the local size counter and also updates any 446 * per-cf or per-blocktype metrics it can discern from given 447 * {@link LruCachedBlock} 448 */ 449 private long updateSizeMetrics(LruCachedBlock cb, boolean evict) { 450 long heapsize = cb.heapSize(); 451 BlockType bt = cb.getBuffer().getBlockType(); 452 if (evict) { 453 heapsize *= -1; 454 } 455 if (bt != null && bt.isData()) { 456 dataBlockSize.add(heapsize); 457 } 458 return size.addAndGet(heapsize); 459 } 460 461 /** 462 * Get the buffer of the block with the specified name. 463 * 464 * @param cacheKey block's cache key 465 * @param caching true if the caller caches blocks on cache misses 466 * @param repeat Whether this is a repeat lookup for the same block 467 * (used to avoid double counting cache misses when doing double-check 468 * locking) 469 * @param updateCacheMetrics Whether to update cache metrics or not 470 * 471 * @return buffer of specified cache key, or null if not in cache 472 */ 473 @Override 474 public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, 475 boolean updateCacheMetrics) { 476 LruCachedBlock cb = map.get(cacheKey); 477 if (cb == null) { 478 if (!repeat && updateCacheMetrics) { 479 stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 480 } 481 // If there is another block cache then try and read there. 482 // However if this is a retry ( second time in double checked locking ) 483 // And it's already a miss then the l2 will also be a miss. 484 if (victimHandler != null && !repeat) { 485 Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics); 486 487 // Promote this to L1. 488 if (result != null && caching) { 489 if (result instanceof HFileBlock && ((HFileBlock) result).usesSharedMemory()) { 490 result = ((HFileBlock) result).deepClone(); 491 } 492 cacheBlock(cacheKey, result, /* inMemory = */ false); 493 } 494 return result; 495 } 496 return null; 497 } 498 if (updateCacheMetrics) stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 499 cb.access(count.incrementAndGet()); 500 return cb.getBuffer(); 501 } 502 503 /** 504 * Whether the cache contains block with specified cacheKey 505 * 506 * @return true if contains the block 507 */ 508 public boolean containsBlock(BlockCacheKey cacheKey) { 509 return map.containsKey(cacheKey); 510 } 511 512 @Override 513 public boolean evictBlock(BlockCacheKey cacheKey) { 514 LruCachedBlock cb = map.get(cacheKey); 515 return cb != null && evictBlock(cb, false) > 0; 516 } 517 518 /** 519 * Evicts all blocks for a specific HFile. This is an 520 * expensive operation implemented as a linear-time search through all blocks 521 * in the cache. Ideally this should be a search in a log-access-time map. 522 * 523 * <p> 524 * This is used for evict-on-close to remove all blocks of a specific HFile. 525 * 526 * @return the number of blocks evicted 527 */ 528 @Override 529 public int evictBlocksByHfileName(String hfileName) { 530 int numEvicted = 0; 531 for (BlockCacheKey key : map.keySet()) { 532 if (key.getHfileName().equals(hfileName)) { 533 if (evictBlock(key)) 534 ++numEvicted; 535 } 536 } 537 if (victimHandler != null) { 538 numEvicted += victimHandler.evictBlocksByHfileName(hfileName); 539 } 540 return numEvicted; 541 } 542 543 /** 544 * Evict the block, and it will be cached by the victim handler if exists && 545 * block may be read again later 546 * 547 * @param evictedByEvictionProcess true if the given block is evicted by 548 * EvictionThread 549 * @return the heap size of evicted block 550 */ 551 protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) { 552 boolean found = map.remove(block.getCacheKey()) != null; 553 if (!found) { 554 return 0; 555 } 556 updateSizeMetrics(block, true); 557 long val = elements.decrementAndGet(); 558 if (LOG.isTraceEnabled()) { 559 long size = map.size(); 560 assertCounterSanity(size, val); 561 } 562 if (block.getBuffer().getBlockType().isData()) { 563 dataBlockElements.decrement(); 564 } 565 if (evictedByEvictionProcess) { 566 // When the eviction of the block happened because of invalidation of HFiles, no need to 567 // update the stats counter. 568 stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary()); 569 if (victimHandler != null) { 570 victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer()); 571 } 572 } 573 return block.heapSize(); 574 } 575 576 /** 577 * Multi-threaded call to run the eviction process. 578 */ 579 private void runEviction() { 580 if (evictionThread == null) { 581 evict(); 582 } else { 583 evictionThread.evict(); 584 } 585 } 586 587 @VisibleForTesting 588 boolean isEvictionInProgress() { 589 return evictionInProgress; 590 } 591 592 @VisibleForTesting 593 long getOverhead() { 594 return overhead; 595 } 596 597 /** 598 * Eviction method. 599 */ 600 void evict() { 601 602 // Ensure only one eviction at a time 603 if(!evictionLock.tryLock()) return; 604 605 try { 606 evictionInProgress = true; 607 long currentSize = this.size.get(); 608 long bytesToFree = currentSize - minSize(); 609 610 if (LOG.isTraceEnabled()) { 611 LOG.trace("Block cache LRU eviction started; Attempting to free " + 612 StringUtils.byteDesc(bytesToFree) + " of total=" + 613 StringUtils.byteDesc(currentSize)); 614 } 615 616 if (bytesToFree <= 0) return; 617 618 // Instantiate priority buckets 619 BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize, singleSize()); 620 BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize, multiSize()); 621 BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize, memorySize()); 622 623 // Scan entire map putting into appropriate buckets 624 for (LruCachedBlock cachedBlock : map.values()) { 625 switch (cachedBlock.getPriority()) { 626 case SINGLE: { 627 bucketSingle.add(cachedBlock); 628 break; 629 } 630 case MULTI: { 631 bucketMulti.add(cachedBlock); 632 break; 633 } 634 case MEMORY: { 635 bucketMemory.add(cachedBlock); 636 break; 637 } 638 } 639 } 640 641 long bytesFreed = 0; 642 if (forceInMemory || memoryFactor > 0.999f) { 643 long s = bucketSingle.totalSize(); 644 long m = bucketMulti.totalSize(); 645 if (bytesToFree > (s + m)) { 646 // this means we need to evict blocks in memory bucket to make room, 647 // so the single and multi buckets will be emptied 648 bytesFreed = bucketSingle.free(s); 649 bytesFreed += bucketMulti.free(m); 650 if (LOG.isTraceEnabled()) { 651 LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) + 652 " from single and multi buckets"); 653 } 654 bytesFreed += bucketMemory.free(bytesToFree - bytesFreed); 655 if (LOG.isTraceEnabled()) { 656 LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) + 657 " total from all three buckets "); 658 } 659 } else { 660 // this means no need to evict block in memory bucket, 661 // and we try best to make the ratio between single-bucket and 662 // multi-bucket is 1:2 663 long bytesRemain = s + m - bytesToFree; 664 if (3 * s <= bytesRemain) { 665 // single-bucket is small enough that no eviction happens for it 666 // hence all eviction goes from multi-bucket 667 bytesFreed = bucketMulti.free(bytesToFree); 668 } else if (3 * m <= 2 * bytesRemain) { 669 // multi-bucket is small enough that no eviction happens for it 670 // hence all eviction goes from single-bucket 671 bytesFreed = bucketSingle.free(bytesToFree); 672 } else { 673 // both buckets need to evict some blocks 674 bytesFreed = bucketSingle.free(s - bytesRemain / 3); 675 if (bytesFreed < bytesToFree) { 676 bytesFreed += bucketMulti.free(bytesToFree - bytesFreed); 677 } 678 } 679 } 680 } else { 681 PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3); 682 683 bucketQueue.add(bucketSingle); 684 bucketQueue.add(bucketMulti); 685 bucketQueue.add(bucketMemory); 686 687 int remainingBuckets = bucketQueue.size(); 688 689 BlockBucket bucket; 690 while ((bucket = bucketQueue.poll()) != null) { 691 long overflow = bucket.overflow(); 692 if (overflow > 0) { 693 long bucketBytesToFree = 694 Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets); 695 bytesFreed += bucket.free(bucketBytesToFree); 696 } 697 remainingBuckets--; 698 } 699 } 700 if (LOG.isTraceEnabled()) { 701 long single = bucketSingle.totalSize(); 702 long multi = bucketMulti.totalSize(); 703 long memory = bucketMemory.totalSize(); 704 LOG.trace("Block cache LRU eviction completed; " + 705 "freed=" + StringUtils.byteDesc(bytesFreed) + ", " + 706 "total=" + StringUtils.byteDesc(this.size.get()) + ", " + 707 "single=" + StringUtils.byteDesc(single) + ", " + 708 "multi=" + StringUtils.byteDesc(multi) + ", " + 709 "memory=" + StringUtils.byteDesc(memory)); 710 } 711 } finally { 712 stats.evict(); 713 evictionInProgress = false; 714 evictionLock.unlock(); 715 } 716 } 717 718 @Override 719 public String toString() { 720 return MoreObjects.toStringHelper(this) 721 .add("blockCount", getBlockCount()) 722 .add("currentSize", StringUtils.byteDesc(getCurrentSize())) 723 .add("freeSize", StringUtils.byteDesc(getFreeSize())) 724 .add("maxSize", StringUtils.byteDesc(getMaxSize())) 725 .add("heapSize", StringUtils.byteDesc(heapSize())) 726 .add("minSize", StringUtils.byteDesc(minSize())) 727 .add("minFactor", minFactor) 728 .add("multiSize", StringUtils.byteDesc(multiSize())) 729 .add("multiFactor", multiFactor) 730 .add("singleSize", StringUtils.byteDesc(singleSize())) 731 .add("singleFactor", singleFactor) 732 .toString(); 733 } 734 735 /** 736 * Used to group blocks into priority buckets. There will be a BlockBucket 737 * for each priority (single, multi, memory). Once bucketed, the eviction 738 * algorithm takes the appropriate number of elements out of each according 739 * to configuration parameters and their relatives sizes. 740 */ 741 private class BlockBucket implements Comparable<BlockBucket> { 742 743 private final String name; 744 private LruCachedBlockQueue queue; 745 private long totalSize = 0; 746 private long bucketSize; 747 748 public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) { 749 this.name = name; 750 this.bucketSize = bucketSize; 751 queue = new LruCachedBlockQueue(bytesToFree, blockSize); 752 totalSize = 0; 753 } 754 755 public void add(LruCachedBlock block) { 756 totalSize += block.heapSize(); 757 queue.add(block); 758 } 759 760 public long free(long toFree) { 761 if (LOG.isTraceEnabled()) { 762 LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this); 763 } 764 LruCachedBlock cb; 765 long freedBytes = 0; 766 while ((cb = queue.pollLast()) != null) { 767 freedBytes += evictBlock(cb, true); 768 if (freedBytes >= toFree) { 769 return freedBytes; 770 } 771 } 772 if (LOG.isTraceEnabled()) { 773 LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this); 774 } 775 return freedBytes; 776 } 777 778 public long overflow() { 779 return totalSize - bucketSize; 780 } 781 782 public long totalSize() { 783 return totalSize; 784 } 785 786 @Override 787 public int compareTo(BlockBucket that) { 788 return Long.compare(this.overflow(), that.overflow()); 789 } 790 791 @Override 792 public boolean equals(Object that) { 793 if (that == null || !(that instanceof BlockBucket)) { 794 return false; 795 } 796 return compareTo((BlockBucket)that) == 0; 797 } 798 799 @Override 800 public int hashCode() { 801 return Objects.hashCode(name, bucketSize, queue, totalSize); 802 } 803 804 @Override 805 public String toString() { 806 return MoreObjects.toStringHelper(this) 807 .add("name", name) 808 .add("totalSize", StringUtils.byteDesc(totalSize)) 809 .add("bucketSize", StringUtils.byteDesc(bucketSize)) 810 .toString(); 811 } 812 } 813 814 /** 815 * Get the maximum size of this cache. 816 * 817 * @return max size in bytes 818 */ 819 820 @Override 821 public long getMaxSize() { 822 return this.maxSize; 823 } 824 825 @Override 826 public long getCurrentSize() { 827 return this.size.get(); 828 } 829 830 @Override 831 public long getCurrentDataSize() { 832 return this.dataBlockSize.sum(); 833 } 834 835 @Override 836 public long getFreeSize() { 837 return getMaxSize() - getCurrentSize(); 838 } 839 840 @Override 841 public long size() { 842 return getMaxSize(); 843 } 844 845 @Override 846 public long getBlockCount() { 847 return this.elements.get(); 848 } 849 850 @Override 851 public long getDataBlockCount() { 852 return this.dataBlockElements.sum(); 853 } 854 855 EvictionThread getEvictionThread() { 856 return this.evictionThread; 857 } 858 859 /* 860 * Eviction thread. Sits in waiting state until an eviction is triggered 861 * when the cache size grows above the acceptable level.<p> 862 * 863 * Thread is triggered into action by {@link LruBlockCache#runEviction()} 864 */ 865 static class EvictionThread extends HasThread { 866 867 private WeakReference<LruBlockCache> cache; 868 private volatile boolean go = true; 869 // flag set after enter the run method, used for test 870 private boolean enteringRun = false; 871 872 public EvictionThread(LruBlockCache cache) { 873 super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread"); 874 setDaemon(true); 875 this.cache = new WeakReference<>(cache); 876 } 877 878 @Override 879 public void run() { 880 enteringRun = true; 881 while (this.go) { 882 synchronized (this) { 883 try { 884 this.wait(1000 * 10/*Don't wait for ever*/); 885 } catch (InterruptedException e) { 886 LOG.warn("Interrupted eviction thread ", e); 887 Thread.currentThread().interrupt(); 888 } 889 } 890 LruBlockCache cache = this.cache.get(); 891 if (cache == null) break; 892 cache.evict(); 893 } 894 } 895 896 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY", 897 justification="This is what we want") 898 public void evict() { 899 synchronized (this) { 900 this.notifyAll(); 901 } 902 } 903 904 synchronized void shutdown() { 905 this.go = false; 906 this.notifyAll(); 907 } 908 909 /** 910 * Used for the test. 911 */ 912 boolean isEnteringRun() { 913 return this.enteringRun; 914 } 915 } 916 917 /* 918 * Statistics thread. Periodically prints the cache statistics to the log. 919 */ 920 static class StatisticsThread extends Thread { 921 922 private final LruBlockCache lru; 923 924 public StatisticsThread(LruBlockCache lru) { 925 super("LruBlockCacheStats"); 926 setDaemon(true); 927 this.lru = lru; 928 } 929 930 @Override 931 public void run() { 932 lru.logStats(); 933 } 934 } 935 936 public void logStats() { 937 // Log size 938 long totalSize = heapSize(); 939 long freeSize = maxSize - totalSize; 940 LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " + 941 "freeSize=" + StringUtils.byteDesc(freeSize) + ", " + 942 "max=" + StringUtils.byteDesc(this.maxSize) + ", " + 943 "blockCount=" + getBlockCount() + ", " + 944 "accesses=" + stats.getRequestCount() + ", " + 945 "hits=" + stats.getHitCount() + ", " + 946 "hitRatio=" + (stats.getHitCount() == 0 ? 947 "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " + 948 "cachingAccesses=" + stats.getRequestCachingCount() + ", " + 949 "cachingHits=" + stats.getHitCachingCount() + ", " + 950 "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ? 951 "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) + 952 "evictions=" + stats.getEvictionCount() + ", " + 953 "evicted=" + stats.getEvictedCount() + ", " + 954 "evictedPerRun=" + stats.evictedPerEviction()); 955 } 956 957 /** 958 * Get counter statistics for this cache. 959 * 960 * <p>Includes: total accesses, hits, misses, evicted blocks, and runs 961 * of the eviction processes. 962 */ 963 @Override 964 public CacheStats getStats() { 965 return this.stats; 966 } 967 968 public final static long CACHE_FIXED_OVERHEAD = ClassSize.align( 969 (4 * Bytes.SIZEOF_LONG) + (11 * ClassSize.REFERENCE) + 970 (6 * Bytes.SIZEOF_FLOAT) + (2 * Bytes.SIZEOF_BOOLEAN) 971 + ClassSize.OBJECT); 972 973 @Override 974 public long heapSize() { 975 return getCurrentSize(); 976 } 977 978 private static long calculateOverhead(long maxSize, long blockSize, int concurrency) { 979 // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG 980 return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP 981 + ((long) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY) 982 + ((long) concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT); 983 } 984 985 @Override 986 public Iterator<CachedBlock> iterator() { 987 final Iterator<LruCachedBlock> iterator = map.values().iterator(); 988 989 return new Iterator<CachedBlock>() { 990 private final long now = System.nanoTime(); 991 992 @Override 993 public boolean hasNext() { 994 return iterator.hasNext(); 995 } 996 997 @Override 998 public CachedBlock next() { 999 final LruCachedBlock b = iterator.next(); 1000 return new CachedBlock() { 1001 @Override 1002 public String toString() { 1003 return BlockCacheUtil.toString(this, now); 1004 } 1005 1006 @Override 1007 public BlockPriority getBlockPriority() { 1008 return b.getPriority(); 1009 } 1010 1011 @Override 1012 public BlockType getBlockType() { 1013 return b.getBuffer().getBlockType(); 1014 } 1015 1016 @Override 1017 public long getOffset() { 1018 return b.getCacheKey().getOffset(); 1019 } 1020 1021 @Override 1022 public long getSize() { 1023 return b.getBuffer().heapSize(); 1024 } 1025 1026 @Override 1027 public long getCachedTime() { 1028 return b.getCachedTime(); 1029 } 1030 1031 @Override 1032 public String getFilename() { 1033 return b.getCacheKey().getHfileName(); 1034 } 1035 1036 @Override 1037 public int compareTo(CachedBlock other) { 1038 int diff = this.getFilename().compareTo(other.getFilename()); 1039 if (diff != 0) return diff; 1040 diff = Long.compare(this.getOffset(), other.getOffset()); 1041 if (diff != 0) return diff; 1042 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) { 1043 throw new IllegalStateException(this.getCachedTime() + ", " + other.getCachedTime()); 1044 } 1045 return Long.compare(other.getCachedTime(), this.getCachedTime()); 1046 } 1047 1048 @Override 1049 public int hashCode() { 1050 return b.hashCode(); 1051 } 1052 1053 @Override 1054 public boolean equals(Object obj) { 1055 if (obj instanceof CachedBlock) { 1056 CachedBlock cb = (CachedBlock)obj; 1057 return compareTo(cb) == 0; 1058 } else { 1059 return false; 1060 } 1061 } 1062 }; 1063 } 1064 1065 @Override 1066 public void remove() { 1067 throw new UnsupportedOperationException(); 1068 } 1069 }; 1070 } 1071 1072 // Simple calculators of sizes given factors and maxSize 1073 1074 long acceptableSize() { 1075 return (long)Math.floor(this.maxSize * this.acceptableFactor); 1076 } 1077 private long minSize() { 1078 return (long)Math.floor(this.maxSize * this.minFactor); 1079 } 1080 private long singleSize() { 1081 return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor); 1082 } 1083 private long multiSize() { 1084 return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor); 1085 } 1086 private long memorySize() { 1087 return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor); 1088 } 1089 1090 @Override 1091 public void shutdown() { 1092 if (victimHandler != null) { 1093 victimHandler.shutdown(); 1094 } 1095 this.scheduleThreadPool.shutdown(); 1096 for (int i = 0; i < 10; i++) { 1097 if (!this.scheduleThreadPool.isShutdown()) { 1098 try { 1099 Thread.sleep(10); 1100 } catch (InterruptedException e) { 1101 LOG.warn("Interrupted while sleeping"); 1102 Thread.currentThread().interrupt(); 1103 break; 1104 } 1105 } 1106 } 1107 1108 if (!this.scheduleThreadPool.isShutdown()) { 1109 List<Runnable> runnables = this.scheduleThreadPool.shutdownNow(); 1110 LOG.debug("Still running " + runnables); 1111 } 1112 this.evictionThread.shutdown(); 1113 } 1114 1115 /** Clears the cache. Used in tests. */ 1116 @VisibleForTesting 1117 public void clearCache() { 1118 this.map.clear(); 1119 this.elements.set(0); 1120 } 1121 1122 /** 1123 * Used in testing. May be very inefficient. 1124 * 1125 * @return the set of cached file names 1126 */ 1127 @VisibleForTesting 1128 SortedSet<String> getCachedFileNamesForTest() { 1129 SortedSet<String> fileNames = new TreeSet<>(); 1130 for (BlockCacheKey cacheKey : map.keySet()) { 1131 fileNames.add(cacheKey.getHfileName()); 1132 } 1133 return fileNames; 1134 } 1135 1136 @VisibleForTesting 1137 Map<BlockType, Integer> getBlockTypeCountsForTest() { 1138 Map<BlockType, Integer> counts = new EnumMap<>(BlockType.class); 1139 for (LruCachedBlock cb : map.values()) { 1140 BlockType blockType = cb.getBuffer().getBlockType(); 1141 Integer count = counts.get(blockType); 1142 counts.put(blockType, (count == null ? 0 : count) + 1); 1143 } 1144 return counts; 1145 } 1146 1147 @VisibleForTesting 1148 public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() { 1149 Map<DataBlockEncoding, Integer> counts = new EnumMap<>(DataBlockEncoding.class); 1150 for (LruCachedBlock block : map.values()) { 1151 DataBlockEncoding encoding = ((HFileBlock) block.getBuffer()).getDataBlockEncoding(); 1152 Integer count = counts.get(encoding); 1153 counts.put(encoding, (count == null ? 0 : count) + 1); 1154 } 1155 return counts; 1156 } 1157 1158 public void setVictimCache(BlockCache handler) { 1159 assert victimHandler == null; 1160 victimHandler = handler; 1161 } 1162 1163 @VisibleForTesting 1164 Map<BlockCacheKey, LruCachedBlock> getMapForTests() { 1165 return map; 1166 } 1167 1168 @Override 1169 public BlockCache[] getBlockCaches() { 1170 if (victimHandler != null) { 1171 return new BlockCache[] { this, this.victimHandler }; 1172 } 1173 return null; 1174 } 1175}