001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.io.hfile; 020 021import java.lang.ref.WeakReference; 022import java.util.EnumMap; 023import java.util.Iterator; 024import java.util.List; 025import java.util.Map; 026import java.util.PriorityQueue; 027import java.util.SortedSet; 028import java.util.TreeSet; 029import java.util.concurrent.ConcurrentHashMap; 030import java.util.concurrent.Executors; 031import java.util.concurrent.ScheduledExecutorService; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicLong; 034import java.util.concurrent.atomic.LongAdder; 035import java.util.concurrent.locks.ReentrantLock; 036 037import org.apache.hadoop.conf.Configuration; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041import org.apache.hadoop.hbase.io.HeapSize; 042import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.util.ClassSize; 045import org.apache.hadoop.hbase.util.HasThread; 046import org.apache.hadoop.util.StringUtils; 047 048import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 049import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 050import org.apache.hbase.thirdparty.com.google.common.base.Objects; 051import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 052 053import com.fasterxml.jackson.annotation.JsonIgnore; 054import com.fasterxml.jackson.annotation.JsonIgnoreProperties; 055 056/** 057 * A block cache implementation that is memory-aware using {@link HeapSize}, 058 * memory-bound using an LRU eviction algorithm, and concurrent: backed by a 059 * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving 060 * constant-time {@link #cacheBlock} and {@link #getBlock} operations.<p> 061 * 062 * Contains three levels of block priority to allow for scan-resistance and in-memory families 063 * {@link org.apache.hadoop.hbase.HColumnDescriptor#setInMemory(boolean)} (An in-memory column 064 * family is a column family that should be served from memory if possible): 065 * single-access, multiple-accesses, and in-memory priority. 066 * A block is added with an in-memory priority flag if 067 * {@link org.apache.hadoop.hbase.HColumnDescriptor#isInMemory()}, otherwise a block becomes a 068 * single access priority the first time it is read into this block cache. If a block is 069 * accessed again while in cache, it is marked as a multiple access priority block. This 070 * delineation of blocks is used to prevent scans from thrashing the cache adding a 071 * least-frequently-used element to the eviction algorithm.<p> 072 * 073 * Each priority is given its own chunk of the total cache to ensure 074 * fairness during eviction. Each priority will retain close to its maximum 075 * size, however, if any priority is not using its entire chunk the others 076 * are able to grow beyond their chunk size.<p> 077 * 078 * Instantiated at a minimum with the total size and average block size. 079 * All sizes are in bytes. The block size is not especially important as this 080 * cache is fully dynamic in its sizing of blocks. It is only used for 081 * pre-allocating data structures and in initial heap estimation of the map.<p> 082 * 083 * The detailed constructor defines the sizes for the three priorities (they 084 * should total to the <code>maximum size</code> defined). It also sets the levels that 085 * trigger and control the eviction thread.<p> 086 * 087 * The <code>acceptable size</code> is the cache size level which triggers the eviction 088 * process to start. It evicts enough blocks to get the size below the 089 * minimum size specified.<p> 090 * 091 * Eviction happens in a separate thread and involves a single full-scan 092 * of the map. It determines how many bytes must be freed to reach the minimum 093 * size, and then while scanning determines the fewest least-recently-used 094 * blocks necessary from each of the three priorities (would be 3 times bytes 095 * to free). It then uses the priority chunk sizes to evict fairly according 096 * to the relative sizes and usage. 097 */ 098@InterfaceAudience.Private 099@JsonIgnoreProperties({"encodingCountsForTest"}) 100public class LruBlockCache implements ResizableBlockCache, HeapSize { 101 102 private static final Logger LOG = LoggerFactory.getLogger(LruBlockCache.class); 103 104 /** 105 * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep 106 * evicting during an eviction run till the cache size is down to 80% of the total. 107 */ 108 private static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor"; 109 110 /** 111 * Acceptable size of cache (no evictions if size < acceptable) 112 */ 113 private static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME = 114 "hbase.lru.blockcache.acceptable.factor"; 115 116 /** 117 * Hard capacity limit of cache, will reject any put if size > this * acceptable 118 */ 119 static final String LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME = 120 "hbase.lru.blockcache.hard.capacity.limit.factor"; 121 private static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME = 122 "hbase.lru.blockcache.single.percentage"; 123 private static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME = 124 "hbase.lru.blockcache.multi.percentage"; 125 private static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME = 126 "hbase.lru.blockcache.memory.percentage"; 127 128 /** 129 * Configuration key to force data-block always (except in-memory are too much) 130 * cached in memory for in-memory hfile, unlike inMemory, which is a column-family 131 * configuration, inMemoryForceMode is a cluster-wide configuration 132 */ 133 private static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME = 134 "hbase.lru.rs.inmemoryforcemode"; 135 136 /* Default Configuration Parameters*/ 137 138 /* Backing Concurrent Map Configuration */ 139 static final float DEFAULT_LOAD_FACTOR = 0.75f; 140 static final int DEFAULT_CONCURRENCY_LEVEL = 16; 141 142 /* Eviction thresholds */ 143 private static final float DEFAULT_MIN_FACTOR = 0.95f; 144 static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f; 145 146 /* Priority buckets */ 147 private static final float DEFAULT_SINGLE_FACTOR = 0.25f; 148 private static final float DEFAULT_MULTI_FACTOR = 0.50f; 149 private static final float DEFAULT_MEMORY_FACTOR = 0.25f; 150 151 private static final float DEFAULT_HARD_CAPACITY_LIMIT_FACTOR = 1.2f; 152 153 private static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false; 154 155 /* Statistics thread */ 156 private static final int STAT_THREAD_PERIOD = 60 * 5; 157 private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size"; 158 private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L; 159 160 /** Concurrent map (the cache) */ 161 private final Map<BlockCacheKey, LruCachedBlock> map; 162 163 /** Eviction lock (locked when eviction in process) */ 164 private final ReentrantLock evictionLock = new ReentrantLock(true); 165 private final long maxBlockSize; 166 167 /** Volatile boolean to track if we are in an eviction process or not */ 168 private volatile boolean evictionInProgress = false; 169 170 /** Eviction thread */ 171 private final EvictionThread evictionThread; 172 173 /** Statistics thread schedule pool (for heavy debugging, could remove) */ 174 private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1, 175 new ThreadFactoryBuilder().setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build()); 176 177 /** Current size of cache */ 178 private final AtomicLong size; 179 180 /** Current size of data blocks */ 181 private final LongAdder dataBlockSize; 182 183 /** Current number of cached elements */ 184 private final AtomicLong elements; 185 186 /** Current number of cached data block elements */ 187 private final LongAdder dataBlockElements; 188 189 /** Cache access count (sequential ID) */ 190 private final AtomicLong count; 191 192 /** hard capacity limit */ 193 private float hardCapacityLimitFactor; 194 195 /** Cache statistics */ 196 private final CacheStats stats; 197 198 /** Maximum allowable size of cache (block put if size > max, evict) */ 199 private long maxSize; 200 201 /** Approximate block size */ 202 private long blockSize; 203 204 /** Acceptable size of cache (no evictions if size < acceptable) */ 205 private float acceptableFactor; 206 207 /** Minimum threshold of cache (when evicting, evict until size < min) */ 208 private float minFactor; 209 210 /** Single access bucket size */ 211 private float singleFactor; 212 213 /** Multiple access bucket size */ 214 private float multiFactor; 215 216 /** In-memory bucket size */ 217 private float memoryFactor; 218 219 /** Overhead of the structure itself */ 220 private long overhead; 221 222 /** Whether in-memory hfile's data block has higher priority when evicting */ 223 private boolean forceInMemory; 224 225 /** 226 * Where to send victims (blocks evicted/missing from the cache). This is used only when we use an 227 * external cache as L2. 228 * Note: See org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache 229 */ 230 private BlockCache victimHandler = null; 231 232 /** 233 * Default constructor. Specify maximum size and expected average block 234 * size (approximation is fine). 235 * 236 * <p>All other factors will be calculated based on defaults specified in 237 * this class. 238 * 239 * @param maxSize maximum size of cache, in bytes 240 * @param blockSize approximate size of each block, in bytes 241 */ 242 public LruBlockCache(long maxSize, long blockSize) { 243 this(maxSize, blockSize, true); 244 } 245 246 /** 247 * Constructor used for testing. Allows disabling of the eviction thread. 248 */ 249 public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) { 250 this(maxSize, blockSize, evictionThread, 251 (int) Math.ceil(1.2 * maxSize / blockSize), 252 DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL, 253 DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR, 254 DEFAULT_SINGLE_FACTOR, 255 DEFAULT_MULTI_FACTOR, 256 DEFAULT_MEMORY_FACTOR, 257 DEFAULT_HARD_CAPACITY_LIMIT_FACTOR, 258 false, 259 DEFAULT_MAX_BLOCK_SIZE 260 ); 261 } 262 263 public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) { 264 this(maxSize, blockSize, evictionThread, 265 (int) Math.ceil(1.2 * maxSize / blockSize), 266 DEFAULT_LOAD_FACTOR, 267 DEFAULT_CONCURRENCY_LEVEL, 268 conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR), 269 conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR), 270 conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR), 271 conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR), 272 conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR), 273 conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, 274 DEFAULT_HARD_CAPACITY_LIMIT_FACTOR), 275 conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE), 276 conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE) 277 ); 278 } 279 280 public LruBlockCache(long maxSize, long blockSize, Configuration conf) { 281 this(maxSize, blockSize, true, conf); 282 } 283 284 /** 285 * Configurable constructor. Use this constructor if not using defaults. 286 * 287 * @param maxSize maximum size of this cache, in bytes 288 * @param blockSize expected average size of blocks, in bytes 289 * @param evictionThread whether to run evictions in a bg thread or not 290 * @param mapInitialSize initial size of backing ConcurrentHashMap 291 * @param mapLoadFactor initial load factor of backing ConcurrentHashMap 292 * @param mapConcurrencyLevel initial concurrency factor for backing CHM 293 * @param minFactor percentage of total size that eviction will evict until 294 * @param acceptableFactor percentage of total size that triggers eviction 295 * @param singleFactor percentage of total size for single-access blocks 296 * @param multiFactor percentage of total size for multiple-access blocks 297 * @param memoryFactor percentage of total size for in-memory blocks 298 */ 299 public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, 300 int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel, 301 float minFactor, float acceptableFactor, float singleFactor, 302 float multiFactor, float memoryFactor, float hardLimitFactor, 303 boolean forceInMemory, long maxBlockSize) { 304 this.maxBlockSize = maxBlockSize; 305 if(singleFactor + multiFactor + memoryFactor != 1 || 306 singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) { 307 throw new IllegalArgumentException("Single, multi, and memory factors " + 308 " should be non-negative and total 1.0"); 309 } 310 if (minFactor >= acceptableFactor) { 311 throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor"); 312 } 313 if (minFactor >= 1.0f || acceptableFactor >= 1.0f) { 314 throw new IllegalArgumentException("all factors must be < 1"); 315 } 316 this.maxSize = maxSize; 317 this.blockSize = blockSize; 318 this.forceInMemory = forceInMemory; 319 map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel); 320 this.minFactor = minFactor; 321 this.acceptableFactor = acceptableFactor; 322 this.singleFactor = singleFactor; 323 this.multiFactor = multiFactor; 324 this.memoryFactor = memoryFactor; 325 this.stats = new CacheStats(this.getClass().getSimpleName()); 326 this.count = new AtomicLong(0); 327 this.elements = new AtomicLong(0); 328 this.dataBlockElements = new LongAdder(); 329 this.dataBlockSize = new LongAdder(); 330 this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel); 331 this.size = new AtomicLong(this.overhead); 332 this.hardCapacityLimitFactor = hardLimitFactor; 333 if (evictionThread) { 334 this.evictionThread = new EvictionThread(this); 335 this.evictionThread.start(); // FindBugs SC_START_IN_CTOR 336 } else { 337 this.evictionThread = null; 338 } 339 // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log 340 // every five minutes. 341 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD, 342 STAT_THREAD_PERIOD, TimeUnit.SECONDS); 343 } 344 345 @Override 346 public void setMaxSize(long maxSize) { 347 this.maxSize = maxSize; 348 if (this.size.get() > acceptableSize() && !evictionInProgress) { 349 runEviction(); 350 } 351 } 352 353 // BlockCache implementation 354 355 /** 356 * Cache the block with the specified name and buffer. 357 * <p> 358 * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547) 359 * this can happen, for which we compare the buffer contents. 360 * 361 * @param cacheKey block's cache key 362 * @param buf block buffer 363 * @param inMemory if block is in-memory 364 */ 365 @Override 366 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { 367 if (buf.heapSize() > maxBlockSize) { 368 // If there are a lot of blocks that are too 369 // big this can make the logs way too noisy. 370 // So we log 2% 371 if (stats.failInsert() % 50 == 0) { 372 LOG.warn("Trying to cache too large a block " 373 + cacheKey.getHfileName() + " @ " 374 + cacheKey.getOffset() 375 + " is " + buf.heapSize() 376 + " which is larger than " + maxBlockSize); 377 } 378 return; 379 } 380 381 LruCachedBlock cb = map.get(cacheKey); 382 if (cb != null && !BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, buf)) { 383 return; 384 } 385 long currentSize = size.get(); 386 long currentAcceptableSize = acceptableSize(); 387 long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize); 388 if (currentSize >= hardLimitSize) { 389 stats.failInsert(); 390 if (LOG.isTraceEnabled()) { 391 LOG.trace("LruBlockCache current size " + StringUtils.byteDesc(currentSize) 392 + " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + "." 393 + " The hard limit size is " + StringUtils.byteDesc(hardLimitSize) 394 + ", failed to put cacheKey:" + cacheKey + " into LruBlockCache."); 395 } 396 if (!evictionInProgress) { 397 runEviction(); 398 } 399 return; 400 } 401 cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory); 402 long newSize = updateSizeMetrics(cb, false); 403 map.put(cacheKey, cb); 404 long val = elements.incrementAndGet(); 405 if (buf.getBlockType().isData()) { 406 dataBlockElements.increment(); 407 } 408 if (LOG.isTraceEnabled()) { 409 long size = map.size(); 410 assertCounterSanity(size, val); 411 } 412 if (newSize > currentAcceptableSize && !evictionInProgress) { 413 runEviction(); 414 } 415 } 416 417 /** 418 * Sanity-checking for parity between actual block cache content and metrics. 419 * Intended only for use with TRACE level logging and -ea JVM. 420 */ 421 private static void assertCounterSanity(long mapSize, long counterVal) { 422 if (counterVal < 0) { 423 LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal + 424 ", mapSize=" + mapSize); 425 return; 426 } 427 if (mapSize < Integer.MAX_VALUE) { 428 double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.); 429 if (pct_diff > 0.05) { 430 LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal + 431 ", mapSize=" + mapSize); 432 } 433 } 434 } 435 436 /** 437 * Cache the block with the specified name and buffer. 438 * <p> 439 * 440 * @param cacheKey block's cache key 441 * @param buf block buffer 442 */ 443 @Override 444 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 445 cacheBlock(cacheKey, buf, false); 446 } 447 448 /** 449 * Helper function that updates the local size counter and also updates any 450 * per-cf or per-blocktype metrics it can discern from given 451 * {@link LruCachedBlock} 452 */ 453 private long updateSizeMetrics(LruCachedBlock cb, boolean evict) { 454 long heapsize = cb.heapSize(); 455 BlockType bt = cb.getBuffer().getBlockType(); 456 if (evict) { 457 heapsize *= -1; 458 } 459 if (bt != null && bt.isData()) { 460 dataBlockSize.add(heapsize); 461 } 462 return size.addAndGet(heapsize); 463 } 464 465 /** 466 * Get the buffer of the block with the specified name. 467 * 468 * @param cacheKey block's cache key 469 * @param caching true if the caller caches blocks on cache misses 470 * @param repeat Whether this is a repeat lookup for the same block 471 * (used to avoid double counting cache misses when doing double-check 472 * locking) 473 * @param updateCacheMetrics Whether to update cache metrics or not 474 * 475 * @return buffer of specified cache key, or null if not in cache 476 */ 477 @Override 478 public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, 479 boolean updateCacheMetrics) { 480 LruCachedBlock cb = map.get(cacheKey); 481 if (cb == null) { 482 if (!repeat && updateCacheMetrics) { 483 stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 484 } 485 // If there is another block cache then try and read there. 486 // However if this is a retry ( second time in double checked locking ) 487 // And it's already a miss then the l2 will also be a miss. 488 if (victimHandler != null && !repeat) { 489 Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics); 490 491 // Promote this to L1. 492 if (result != null && caching) { 493 if (result instanceof HFileBlock && ((HFileBlock) result).usesSharedMemory()) { 494 result = ((HFileBlock) result).deepClone(); 495 } 496 cacheBlock(cacheKey, result, /* inMemory = */ false); 497 } 498 return result; 499 } 500 return null; 501 } 502 if (updateCacheMetrics) stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 503 cb.access(count.incrementAndGet()); 504 return cb.getBuffer(); 505 } 506 507 /** 508 * Whether the cache contains block with specified cacheKey 509 * 510 * @return true if contains the block 511 */ 512 public boolean containsBlock(BlockCacheKey cacheKey) { 513 return map.containsKey(cacheKey); 514 } 515 516 @Override 517 public boolean evictBlock(BlockCacheKey cacheKey) { 518 LruCachedBlock cb = map.get(cacheKey); 519 return cb != null && evictBlock(cb, false) > 0; 520 } 521 522 /** 523 * Evicts all blocks for a specific HFile. This is an 524 * expensive operation implemented as a linear-time search through all blocks 525 * in the cache. Ideally this should be a search in a log-access-time map. 526 * 527 * <p> 528 * This is used for evict-on-close to remove all blocks of a specific HFile. 529 * 530 * @return the number of blocks evicted 531 */ 532 @Override 533 public int evictBlocksByHfileName(String hfileName) { 534 int numEvicted = 0; 535 for (BlockCacheKey key : map.keySet()) { 536 if (key.getHfileName().equals(hfileName)) { 537 if (evictBlock(key)) 538 ++numEvicted; 539 } 540 } 541 if (victimHandler != null) { 542 numEvicted += victimHandler.evictBlocksByHfileName(hfileName); 543 } 544 return numEvicted; 545 } 546 547 /** 548 * Evict the block, and it will be cached by the victim handler if exists && 549 * block may be read again later 550 * 551 * @param evictedByEvictionProcess true if the given block is evicted by 552 * EvictionThread 553 * @return the heap size of evicted block 554 */ 555 protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) { 556 boolean found = map.remove(block.getCacheKey()) != null; 557 if (!found) { 558 return 0; 559 } 560 updateSizeMetrics(block, true); 561 long val = elements.decrementAndGet(); 562 if (LOG.isTraceEnabled()) { 563 long size = map.size(); 564 assertCounterSanity(size, val); 565 } 566 if (block.getBuffer().getBlockType().isData()) { 567 dataBlockElements.decrement(); 568 } 569 if (evictedByEvictionProcess) { 570 // When the eviction of the block happened because of invalidation of HFiles, no need to 571 // update the stats counter. 572 stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary()); 573 if (victimHandler != null) { 574 victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer()); 575 } 576 } 577 return block.heapSize(); 578 } 579 580 /** 581 * Multi-threaded call to run the eviction process. 582 */ 583 private void runEviction() { 584 if (evictionThread == null) { 585 evict(); 586 } else { 587 evictionThread.evict(); 588 } 589 } 590 591 @VisibleForTesting 592 boolean isEvictionInProgress() { 593 return evictionInProgress; 594 } 595 596 @VisibleForTesting 597 long getOverhead() { 598 return overhead; 599 } 600 601 /** 602 * Eviction method. 603 */ 604 void evict() { 605 606 // Ensure only one eviction at a time 607 if(!evictionLock.tryLock()) return; 608 609 try { 610 evictionInProgress = true; 611 long currentSize = this.size.get(); 612 long bytesToFree = currentSize - minSize(); 613 614 if (LOG.isTraceEnabled()) { 615 LOG.trace("Block cache LRU eviction started; Attempting to free " + 616 StringUtils.byteDesc(bytesToFree) + " of total=" + 617 StringUtils.byteDesc(currentSize)); 618 } 619 620 if (bytesToFree <= 0) return; 621 622 // Instantiate priority buckets 623 BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize, singleSize()); 624 BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize, multiSize()); 625 BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize, memorySize()); 626 627 // Scan entire map putting into appropriate buckets 628 for (LruCachedBlock cachedBlock : map.values()) { 629 switch (cachedBlock.getPriority()) { 630 case SINGLE: { 631 bucketSingle.add(cachedBlock); 632 break; 633 } 634 case MULTI: { 635 bucketMulti.add(cachedBlock); 636 break; 637 } 638 case MEMORY: { 639 bucketMemory.add(cachedBlock); 640 break; 641 } 642 } 643 } 644 645 long bytesFreed = 0; 646 if (forceInMemory || memoryFactor > 0.999f) { 647 long s = bucketSingle.totalSize(); 648 long m = bucketMulti.totalSize(); 649 if (bytesToFree > (s + m)) { 650 // this means we need to evict blocks in memory bucket to make room, 651 // so the single and multi buckets will be emptied 652 bytesFreed = bucketSingle.free(s); 653 bytesFreed += bucketMulti.free(m); 654 if (LOG.isTraceEnabled()) { 655 LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) + 656 " from single and multi buckets"); 657 } 658 bytesFreed += bucketMemory.free(bytesToFree - bytesFreed); 659 if (LOG.isTraceEnabled()) { 660 LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) + 661 " total from all three buckets "); 662 } 663 } else { 664 // this means no need to evict block in memory bucket, 665 // and we try best to make the ratio between single-bucket and 666 // multi-bucket is 1:2 667 long bytesRemain = s + m - bytesToFree; 668 if (3 * s <= bytesRemain) { 669 // single-bucket is small enough that no eviction happens for it 670 // hence all eviction goes from multi-bucket 671 bytesFreed = bucketMulti.free(bytesToFree); 672 } else if (3 * m <= 2 * bytesRemain) { 673 // multi-bucket is small enough that no eviction happens for it 674 // hence all eviction goes from single-bucket 675 bytesFreed = bucketSingle.free(bytesToFree); 676 } else { 677 // both buckets need to evict some blocks 678 bytesFreed = bucketSingle.free(s - bytesRemain / 3); 679 if (bytesFreed < bytesToFree) { 680 bytesFreed += bucketMulti.free(bytesToFree - bytesFreed); 681 } 682 } 683 } 684 } else { 685 PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3); 686 687 bucketQueue.add(bucketSingle); 688 bucketQueue.add(bucketMulti); 689 bucketQueue.add(bucketMemory); 690 691 int remainingBuckets = bucketQueue.size(); 692 693 BlockBucket bucket; 694 while ((bucket = bucketQueue.poll()) != null) { 695 long overflow = bucket.overflow(); 696 if (overflow > 0) { 697 long bucketBytesToFree = 698 Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets); 699 bytesFreed += bucket.free(bucketBytesToFree); 700 } 701 remainingBuckets--; 702 } 703 } 704 if (LOG.isTraceEnabled()) { 705 long single = bucketSingle.totalSize(); 706 long multi = bucketMulti.totalSize(); 707 long memory = bucketMemory.totalSize(); 708 LOG.trace("Block cache LRU eviction completed; " + 709 "freed=" + StringUtils.byteDesc(bytesFreed) + ", " + 710 "total=" + StringUtils.byteDesc(this.size.get()) + ", " + 711 "single=" + StringUtils.byteDesc(single) + ", " + 712 "multi=" + StringUtils.byteDesc(multi) + ", " + 713 "memory=" + StringUtils.byteDesc(memory)); 714 } 715 } finally { 716 stats.evict(); 717 evictionInProgress = false; 718 evictionLock.unlock(); 719 } 720 } 721 722 @Override 723 public String toString() { 724 return MoreObjects.toStringHelper(this) 725 .add("blockCount", getBlockCount()) 726 .add("currentSize", StringUtils.byteDesc(getCurrentSize())) 727 .add("freeSize", StringUtils.byteDesc(getFreeSize())) 728 .add("maxSize", StringUtils.byteDesc(getMaxSize())) 729 .add("heapSize", StringUtils.byteDesc(heapSize())) 730 .add("minSize", StringUtils.byteDesc(minSize())) 731 .add("minFactor", minFactor) 732 .add("multiSize", StringUtils.byteDesc(multiSize())) 733 .add("multiFactor", multiFactor) 734 .add("singleSize", StringUtils.byteDesc(singleSize())) 735 .add("singleFactor", singleFactor) 736 .toString(); 737 } 738 739 /** 740 * Used to group blocks into priority buckets. There will be a BlockBucket 741 * for each priority (single, multi, memory). Once bucketed, the eviction 742 * algorithm takes the appropriate number of elements out of each according 743 * to configuration parameters and their relatives sizes. 744 */ 745 private class BlockBucket implements Comparable<BlockBucket> { 746 747 private final String name; 748 private LruCachedBlockQueue queue; 749 private long totalSize = 0; 750 private long bucketSize; 751 752 public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) { 753 this.name = name; 754 this.bucketSize = bucketSize; 755 queue = new LruCachedBlockQueue(bytesToFree, blockSize); 756 totalSize = 0; 757 } 758 759 public void add(LruCachedBlock block) { 760 totalSize += block.heapSize(); 761 queue.add(block); 762 } 763 764 public long free(long toFree) { 765 if (LOG.isTraceEnabled()) { 766 LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this); 767 } 768 LruCachedBlock cb; 769 long freedBytes = 0; 770 while ((cb = queue.pollLast()) != null) { 771 freedBytes += evictBlock(cb, true); 772 if (freedBytes >= toFree) { 773 return freedBytes; 774 } 775 } 776 if (LOG.isTraceEnabled()) { 777 LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this); 778 } 779 return freedBytes; 780 } 781 782 public long overflow() { 783 return totalSize - bucketSize; 784 } 785 786 public long totalSize() { 787 return totalSize; 788 } 789 790 @Override 791 public int compareTo(BlockBucket that) { 792 return Long.compare(this.overflow(), that.overflow()); 793 } 794 795 @Override 796 public boolean equals(Object that) { 797 if (that == null || !(that instanceof BlockBucket)) { 798 return false; 799 } 800 return compareTo((BlockBucket)that) == 0; 801 } 802 803 @Override 804 public int hashCode() { 805 return Objects.hashCode(name, bucketSize, queue, totalSize); 806 } 807 808 @Override 809 public String toString() { 810 return MoreObjects.toStringHelper(this) 811 .add("name", name) 812 .add("totalSize", StringUtils.byteDesc(totalSize)) 813 .add("bucketSize", StringUtils.byteDesc(bucketSize)) 814 .toString(); 815 } 816 } 817 818 /** 819 * Get the maximum size of this cache. 820 * 821 * @return max size in bytes 822 */ 823 824 @Override 825 public long getMaxSize() { 826 return this.maxSize; 827 } 828 829 @Override 830 public long getCurrentSize() { 831 return this.size.get(); 832 } 833 834 @Override 835 public long getCurrentDataSize() { 836 return this.dataBlockSize.sum(); 837 } 838 839 @Override 840 public long getFreeSize() { 841 return getMaxSize() - getCurrentSize(); 842 } 843 844 @Override 845 public long size() { 846 return getMaxSize(); 847 } 848 849 @Override 850 public long getBlockCount() { 851 return this.elements.get(); 852 } 853 854 @Override 855 public long getDataBlockCount() { 856 return this.dataBlockElements.sum(); 857 } 858 859 EvictionThread getEvictionThread() { 860 return this.evictionThread; 861 } 862 863 /* 864 * Eviction thread. Sits in waiting state until an eviction is triggered 865 * when the cache size grows above the acceptable level.<p> 866 * 867 * Thread is triggered into action by {@link LruBlockCache#runEviction()} 868 */ 869 static class EvictionThread extends HasThread { 870 871 private WeakReference<LruBlockCache> cache; 872 private volatile boolean go = true; 873 // flag set after enter the run method, used for test 874 private boolean enteringRun = false; 875 876 public EvictionThread(LruBlockCache cache) { 877 super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread"); 878 setDaemon(true); 879 this.cache = new WeakReference<>(cache); 880 } 881 882 @Override 883 public void run() { 884 enteringRun = true; 885 while (this.go) { 886 synchronized (this) { 887 try { 888 this.wait(1000 * 10/*Don't wait for ever*/); 889 } catch (InterruptedException e) { 890 LOG.warn("Interrupted eviction thread ", e); 891 Thread.currentThread().interrupt(); 892 } 893 } 894 LruBlockCache cache = this.cache.get(); 895 if (cache == null) break; 896 cache.evict(); 897 } 898 } 899 900 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY", 901 justification="This is what we want") 902 public void evict() { 903 synchronized (this) { 904 this.notifyAll(); 905 } 906 } 907 908 synchronized void shutdown() { 909 this.go = false; 910 this.notifyAll(); 911 } 912 913 /** 914 * Used for the test. 915 */ 916 boolean isEnteringRun() { 917 return this.enteringRun; 918 } 919 } 920 921 /* 922 * Statistics thread. Periodically prints the cache statistics to the log. 923 */ 924 static class StatisticsThread extends Thread { 925 926 private final LruBlockCache lru; 927 928 public StatisticsThread(LruBlockCache lru) { 929 super("LruBlockCacheStats"); 930 setDaemon(true); 931 this.lru = lru; 932 } 933 934 @Override 935 public void run() { 936 lru.logStats(); 937 } 938 } 939 940 public void logStats() { 941 // Log size 942 long totalSize = heapSize(); 943 long freeSize = maxSize - totalSize; 944 LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " + 945 "freeSize=" + StringUtils.byteDesc(freeSize) + ", " + 946 "max=" + StringUtils.byteDesc(this.maxSize) + ", " + 947 "blockCount=" + getBlockCount() + ", " + 948 "accesses=" + stats.getRequestCount() + ", " + 949 "hits=" + stats.getHitCount() + ", " + 950 "hitRatio=" + (stats.getHitCount() == 0 ? 951 "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " + 952 "cachingAccesses=" + stats.getRequestCachingCount() + ", " + 953 "cachingHits=" + stats.getHitCachingCount() + ", " + 954 "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ? 955 "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) + 956 "evictions=" + stats.getEvictionCount() + ", " + 957 "evicted=" + stats.getEvictedCount() + ", " + 958 "evictedPerRun=" + stats.evictedPerEviction()); 959 } 960 961 /** 962 * Get counter statistics for this cache. 963 * 964 * <p>Includes: total accesses, hits, misses, evicted blocks, and runs 965 * of the eviction processes. 966 */ 967 @Override 968 public CacheStats getStats() { 969 return this.stats; 970 } 971 972 public final static long CACHE_FIXED_OVERHEAD = ClassSize.align( 973 (4 * Bytes.SIZEOF_LONG) + (11 * ClassSize.REFERENCE) + 974 (6 * Bytes.SIZEOF_FLOAT) + (2 * Bytes.SIZEOF_BOOLEAN) 975 + ClassSize.OBJECT); 976 977 @Override 978 public long heapSize() { 979 return getCurrentSize(); 980 } 981 982 private static long calculateOverhead(long maxSize, long blockSize, int concurrency) { 983 // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG 984 return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP 985 + ((long) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY) 986 + ((long) concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT); 987 } 988 989 @Override 990 public Iterator<CachedBlock> iterator() { 991 final Iterator<LruCachedBlock> iterator = map.values().iterator(); 992 993 return new Iterator<CachedBlock>() { 994 private final long now = System.nanoTime(); 995 996 @Override 997 public boolean hasNext() { 998 return iterator.hasNext(); 999 } 1000 1001 @Override 1002 public CachedBlock next() { 1003 final LruCachedBlock b = iterator.next(); 1004 return new CachedBlock() { 1005 @Override 1006 public String toString() { 1007 return BlockCacheUtil.toString(this, now); 1008 } 1009 1010 @Override 1011 public BlockPriority getBlockPriority() { 1012 return b.getPriority(); 1013 } 1014 1015 @Override 1016 public BlockType getBlockType() { 1017 return b.getBuffer().getBlockType(); 1018 } 1019 1020 @Override 1021 public long getOffset() { 1022 return b.getCacheKey().getOffset(); 1023 } 1024 1025 @Override 1026 public long getSize() { 1027 return b.getBuffer().heapSize(); 1028 } 1029 1030 @Override 1031 public long getCachedTime() { 1032 return b.getCachedTime(); 1033 } 1034 1035 @Override 1036 public String getFilename() { 1037 return b.getCacheKey().getHfileName(); 1038 } 1039 1040 @Override 1041 public int compareTo(CachedBlock other) { 1042 int diff = this.getFilename().compareTo(other.getFilename()); 1043 if (diff != 0) return diff; 1044 diff = Long.compare(this.getOffset(), other.getOffset()); 1045 if (diff != 0) return diff; 1046 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) { 1047 throw new IllegalStateException(this.getCachedTime() + ", " + other.getCachedTime()); 1048 } 1049 return Long.compare(other.getCachedTime(), this.getCachedTime()); 1050 } 1051 1052 @Override 1053 public int hashCode() { 1054 return b.hashCode(); 1055 } 1056 1057 @Override 1058 public boolean equals(Object obj) { 1059 if (obj instanceof CachedBlock) { 1060 CachedBlock cb = (CachedBlock)obj; 1061 return compareTo(cb) == 0; 1062 } else { 1063 return false; 1064 } 1065 } 1066 }; 1067 } 1068 1069 @Override 1070 public void remove() { 1071 throw new UnsupportedOperationException(); 1072 } 1073 }; 1074 } 1075 1076 // Simple calculators of sizes given factors and maxSize 1077 1078 long acceptableSize() { 1079 return (long)Math.floor(this.maxSize * this.acceptableFactor); 1080 } 1081 private long minSize() { 1082 return (long)Math.floor(this.maxSize * this.minFactor); 1083 } 1084 private long singleSize() { 1085 return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor); 1086 } 1087 private long multiSize() { 1088 return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor); 1089 } 1090 private long memorySize() { 1091 return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor); 1092 } 1093 1094 @Override 1095 public void shutdown() { 1096 if (victimHandler != null) { 1097 victimHandler.shutdown(); 1098 } 1099 this.scheduleThreadPool.shutdown(); 1100 for (int i = 0; i < 10; i++) { 1101 if (!this.scheduleThreadPool.isShutdown()) { 1102 try { 1103 Thread.sleep(10); 1104 } catch (InterruptedException e) { 1105 LOG.warn("Interrupted while sleeping"); 1106 Thread.currentThread().interrupt(); 1107 break; 1108 } 1109 } 1110 } 1111 1112 if (!this.scheduleThreadPool.isShutdown()) { 1113 List<Runnable> runnables = this.scheduleThreadPool.shutdownNow(); 1114 LOG.debug("Still running " + runnables); 1115 } 1116 this.evictionThread.shutdown(); 1117 } 1118 1119 /** Clears the cache. Used in tests. */ 1120 @VisibleForTesting 1121 public void clearCache() { 1122 this.map.clear(); 1123 this.elements.set(0); 1124 } 1125 1126 /** 1127 * Used in testing. May be very inefficient. 1128 * 1129 * @return the set of cached file names 1130 */ 1131 @VisibleForTesting 1132 SortedSet<String> getCachedFileNamesForTest() { 1133 SortedSet<String> fileNames = new TreeSet<>(); 1134 for (BlockCacheKey cacheKey : map.keySet()) { 1135 fileNames.add(cacheKey.getHfileName()); 1136 } 1137 return fileNames; 1138 } 1139 1140 @VisibleForTesting 1141 Map<BlockType, Integer> getBlockTypeCountsForTest() { 1142 Map<BlockType, Integer> counts = new EnumMap<>(BlockType.class); 1143 for (LruCachedBlock cb : map.values()) { 1144 BlockType blockType = cb.getBuffer().getBlockType(); 1145 Integer count = counts.get(blockType); 1146 counts.put(blockType, (count == null ? 0 : count) + 1); 1147 } 1148 return counts; 1149 } 1150 1151 @VisibleForTesting 1152 public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() { 1153 Map<DataBlockEncoding, Integer> counts = new EnumMap<>(DataBlockEncoding.class); 1154 for (LruCachedBlock block : map.values()) { 1155 DataBlockEncoding encoding = ((HFileBlock) block.getBuffer()).getDataBlockEncoding(); 1156 Integer count = counts.get(encoding); 1157 counts.put(encoding, (count == null ? 0 : count) + 1); 1158 } 1159 return counts; 1160 } 1161 1162 public void setVictimCache(BlockCache handler) { 1163 assert victimHandler == null; 1164 victimHandler = handler; 1165 } 1166 1167 @VisibleForTesting 1168 Map<BlockCacheKey, LruCachedBlock> getMapForTests() { 1169 return map; 1170 } 1171 1172 @Override 1173 @JsonIgnore 1174 public BlockCache[] getBlockCaches() { 1175 if (victimHandler != null) 1176 return new BlockCache[] {this, this.victimHandler}; 1177 return null; 1178 } 1179}