001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static java.util.Objects.requireNonNull; 021 022import java.lang.ref.WeakReference; 023import java.util.EnumMap; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Map; 027import java.util.PriorityQueue; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.Executors; 030import java.util.concurrent.ScheduledExecutorService; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.atomic.AtomicLong; 033import java.util.concurrent.atomic.LongAdder; 034import java.util.concurrent.locks.ReentrantLock; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.io.HeapSize; 037import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 038import org.apache.hadoop.hbase.util.ClassSize; 039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 040import org.apache.hadoop.util.StringUtils; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 046import org.apache.hbase.thirdparty.com.google.common.base.Objects; 047import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 048 049/** 050 * <b>This realisation improve performance of classical LRU cache up to 3 times via reduce GC 051 * job.</b> 052 * </p> 053 * The classical block cache implementation that is memory-aware using {@link HeapSize}, 054 * memory-bound using an LRU eviction algorithm, and concurrent: backed by a 055 * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving constant-time 056 * {@link #cacheBlock} and {@link #getBlock} operations. 057 * </p> 058 * Contains three levels of block priority to allow for scan-resistance and in-memory families 059 * {@link org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder#setInMemory(boolean)} (An 060 * in-memory column family is a column family that should be served from memory if possible): 061 * single-access, multiple-accesses, and in-memory priority. A block is added with an in-memory 062 * priority flag if {@link org.apache.hadoop.hbase.client.ColumnFamilyDescriptor#isInMemory()}, 063 * otherwise a block becomes a single access priority the first time it is read into this block 064 * cache. If a block is accessed again while in cache, it is marked as a multiple access priority 065 * block. This delineation of blocks is used to prevent scans from thrashing the cache adding a 066 * least-frequently-used element to the eviction algorithm. 067 * <p/> 068 * Each priority is given its own chunk of the total cache to ensure fairness during eviction. Each 069 * priority will retain close to its maximum size, however, if any priority is not using its entire 070 * chunk the others are able to grow beyond their chunk size. 071 * <p/> 072 * Instantiated at a minimum with the total size and average block size. All sizes are in bytes. The 073 * block size is not especially important as this cache is fully dynamic in its sizing of blocks. It 074 * is only used for pre-allocating data structures and in initial heap estimation of the map. 075 * <p/> 076 * The detailed constructor defines the sizes for the three priorities (they should total to the 077 * <code>maximum size</code> defined). It also sets the levels that trigger and control the eviction 078 * thread. 079 * <p/> 080 * The <code>acceptable size</code> is the cache size level which triggers the eviction process to 081 * start. It evicts enough blocks to get the size below the minimum size specified. 082 * <p/> 083 * Eviction happens in a separate thread and involves a single full-scan of the map. It determines 084 * how many bytes must be freed to reach the minimum size, and then while scanning determines the 085 * fewest least-recently-used blocks necessary from each of the three priorities (would be 3 times 086 * bytes to free). It then uses the priority chunk sizes to evict fairly according to the relative 087 * sizes and usage. 088 * <p/> 089 * Adaptive LRU cache lets speed up performance while we are reading much more data than can fit 090 * into BlockCache and it is the cause of a high rate of evictions. This in turn leads to heavy 091 * Garbage Collector works. So a lot of blocks put into BlockCache but never read, but spending a 092 * lot of CPU resources for cleaning. We could avoid this situation via parameters: 093 * <p/> 094 * <b>hbase.lru.cache.heavy.eviction.count.limit</b> - set how many times we have to run the 095 * eviction process that starts to avoid putting data to BlockCache. By default it is 0 and it meats 096 * the feature will start at the beginning. But if we have some times short reading the same data 097 * and some times long-term reading - we can divide it by this parameter. For example we know that 098 * our short reading used to be about 1 minutes, then we have to set the parameter about 10 and it 099 * will enable the feature only for long time massive reading (after ~100 seconds). So when we use 100 * short-reading and want all of them in the cache we will have it (except for eviction of course). 101 * When we use long-term heavy reading the feature will be enabled after some time and bring better 102 * performance. 103 * <p/> 104 * <b>hbase.lru.cache.heavy.eviction.mb.size.limit</b> - set how many bytes in 10 seconds desirable 105 * putting into BlockCache (and evicted from it). The feature will try to reach this value and 106 * maintain it. Don't try to set it too small because it leads to premature exit from this mode. For 107 * powerful CPUs (about 20-40 physical cores) it could be about 400-500 MB. Average system (~10 108 * cores) 200-300 MB. Some weak systems (2-5 cores) may be good with 50-100 MB. How it works: we set 109 * the limit and after each ~10 second calculate how many bytes were freed. Overhead = Freed Bytes 110 * Sum (MB) * 100 / Limit (MB) - 100; For example we set the limit = 500 and were evicted 2000 MB. 111 * Overhead is: 2000 * 100 / 500 - 100 = 300% The feature is going to reduce a percent caching data 112 * blocks and fit evicted bytes closer to 100% (500 MB). Some kind of an auto-scaling. If freed 113 * bytes less then the limit we have got negative overhead. For example if were freed 200 MB: 200 * 114 * 100 / 500 - 100 = -60% The feature will increase the percent of caching blocks. That leads to fit 115 * evicted bytes closer to 100% (500 MB). The current situation we can find out in the log of 116 * RegionServer: BlockCache evicted (MB): 0, overhead (%): -100, heavy eviction counter: 0, current 117 * caching DataBlock (%): 100 - means no eviction, 100% blocks is caching BlockCache evicted (MB): 118 * 2000, overhead (%): 300, heavy eviction counter: 1, current caching DataBlock (%): 97 - means 119 * eviction begin, reduce of caching blocks by 3%. It help to tune your system and find out what 120 * value is better set. Don't try to reach 0% overhead, it is impossible. Quite good 50-100% 121 * overhead, it prevents premature exit from this mode. 122 * <p/> 123 * <b>hbase.lru.cache.heavy.eviction.overhead.coefficient</b> - set how fast we want to get the 124 * result. If we know that our reading is heavy for a long time, we don't want to wait and can 125 * increase the coefficient and get good performance sooner. But if we aren't sure we can do it 126 * slowly and it could prevent premature exit from this mode. So, when the coefficient is higher we 127 * can get better performance when heavy reading is stable. But when reading is changing we can 128 * adjust to it and set the coefficient to lower value. For example, we set the coefficient = 0.01. 129 * It means the overhead (see above) will be multiplied by 0.01 and the result is the value of 130 * reducing percent caching blocks. For example, if the overhead = 300% and the coefficient = 0.01, 131 * then percent of caching blocks will reduce by 3%. Similar logic when overhead has got negative 132 * value (overshooting). Maybe it is just short-term fluctuation and we will try to stay in this 133 * mode. It helps avoid premature exit during short-term fluctuation. Backpressure has simple logic: 134 * more overshooting - more caching blocks. 135 * <p/> 136 * Find more information about improvement: https://issues.apache.org/jira/browse/HBASE-23887 137 */ 138@InterfaceAudience.Private 139public class LruAdaptiveBlockCache implements FirstLevelBlockCache { 140 141 private static final Logger LOG = LoggerFactory.getLogger(LruAdaptiveBlockCache.class); 142 143 /** 144 * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep 145 * evicting during an eviction run till the cache size is down to 80% of the total. 146 */ 147 private static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor"; 148 149 /** 150 * Acceptable size of cache (no evictions if size < acceptable) 151 */ 152 private static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME = 153 "hbase.lru.blockcache.acceptable.factor"; 154 155 /** 156 * Hard capacity limit of cache, will reject any put if size > this * acceptable 157 */ 158 static final String LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME = 159 "hbase.lru.blockcache.hard.capacity.limit.factor"; 160 private static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME = 161 "hbase.lru.blockcache.single.percentage"; 162 private static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME = 163 "hbase.lru.blockcache.multi.percentage"; 164 private static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME = 165 "hbase.lru.blockcache.memory.percentage"; 166 167 /** 168 * Configuration key to force data-block always (except in-memory are too much) cached in memory 169 * for in-memory hfile, unlike inMemory, which is a column-family configuration, inMemoryForceMode 170 * is a cluster-wide configuration 171 */ 172 private static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME = 173 "hbase.lru.rs.inmemoryforcemode"; 174 175 /* Default Configuration Parameters */ 176 177 /* Backing Concurrent Map Configuration */ 178 static final float DEFAULT_LOAD_FACTOR = 0.75f; 179 static final int DEFAULT_CONCURRENCY_LEVEL = 16; 180 181 /* Eviction thresholds */ 182 private static final float DEFAULT_MIN_FACTOR = 0.95f; 183 static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f; 184 185 /* Priority buckets */ 186 private static final float DEFAULT_SINGLE_FACTOR = 0.25f; 187 private static final float DEFAULT_MULTI_FACTOR = 0.50f; 188 private static final float DEFAULT_MEMORY_FACTOR = 0.25f; 189 190 private static final float DEFAULT_HARD_CAPACITY_LIMIT_FACTOR = 1.2f; 191 192 private static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false; 193 194 /* Statistics thread */ 195 private static final int STAT_THREAD_PERIOD = 60 * 5; 196 private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size"; 197 private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L; 198 199 private static final String LRU_CACHE_HEAVY_EVICTION_COUNT_LIMIT = 200 "hbase.lru.cache.heavy.eviction.count.limit"; 201 // Default value actually equal to disable feature of increasing performance. 202 // Because 2147483647 is about ~680 years (after that it will start to work) 203 // We can set it to 0-10 and get the profit right now. 204 // (see details https://issues.apache.org/jira/browse/HBASE-23887). 205 private static final int DEFAULT_LRU_CACHE_HEAVY_EVICTION_COUNT_LIMIT = Integer.MAX_VALUE; 206 207 private static final String LRU_CACHE_HEAVY_EVICTION_MB_SIZE_LIMIT = 208 "hbase.lru.cache.heavy.eviction.mb.size.limit"; 209 private static final long DEFAULT_LRU_CACHE_HEAVY_EVICTION_MB_SIZE_LIMIT = 500; 210 211 private static final String LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT = 212 "hbase.lru.cache.heavy.eviction.overhead.coefficient"; 213 private static final float DEFAULT_LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT = 0.01f; 214 215 /** 216 * Defined the cache map as {@link ConcurrentHashMap} here, because in 217 * {@link LruAdaptiveBlockCache#getBlock}, we need to guarantee the atomicity of 218 * map#computeIfPresent (key, func). Besides, the func method must execute exactly once only when 219 * the key is present and under the lock context, otherwise the reference count will be messed up. 220 * Notice that the {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that. 221 */ 222 private transient final ConcurrentHashMap<BlockCacheKey, LruCachedBlock> map; 223 224 /** Eviction lock (locked when eviction in process) */ 225 private transient final ReentrantLock evictionLock = new ReentrantLock(true); 226 227 private final long maxBlockSize; 228 229 /** Volatile boolean to track if we are in an eviction process or not */ 230 private volatile boolean evictionInProgress = false; 231 232 /** Eviction thread */ 233 private transient final EvictionThread evictionThread; 234 235 /** Statistics thread schedule pool (for heavy debugging, could remove) */ 236 private transient final ScheduledExecutorService scheduleThreadPool = 237 Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() 238 .setNameFormat("LruAdaptiveBlockCacheStatsExecutor").setDaemon(true).build()); 239 240 /** Current size of cache */ 241 private final AtomicLong size; 242 243 /** Current size of data blocks */ 244 private final LongAdder dataBlockSize; 245 246 /** Current number of cached elements */ 247 private final AtomicLong elements; 248 249 /** Current number of cached data block elements */ 250 private final LongAdder dataBlockElements; 251 252 /** Cache access count (sequential ID) */ 253 private final AtomicLong count; 254 255 /** hard capacity limit */ 256 private final float hardCapacityLimitFactor; 257 258 /** Cache statistics */ 259 private final CacheStats stats; 260 261 /** Maximum allowable size of cache (block put if size > max, evict) */ 262 private long maxSize; 263 264 /** Approximate block size */ 265 private final long blockSize; 266 267 /** Acceptable size of cache (no evictions if size < acceptable) */ 268 private final float acceptableFactor; 269 270 /** Minimum threshold of cache (when evicting, evict until size < min) */ 271 private final float minFactor; 272 273 /** Single access bucket size */ 274 private final float singleFactor; 275 276 /** Multiple access bucket size */ 277 private final float multiFactor; 278 279 /** In-memory bucket size */ 280 private final float memoryFactor; 281 282 /** Overhead of the structure itself */ 283 private final long overhead; 284 285 /** Whether in-memory hfile's data block has higher priority when evicting */ 286 private boolean forceInMemory; 287 288 /** 289 * Where to send victims (blocks evicted/missing from the cache). This is used only when we use an 290 * external cache as L2. Note: See org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache 291 */ 292 private transient BlockCache victimHandler = null; 293 294 /** Percent of cached data blocks */ 295 private volatile int cacheDataBlockPercent; 296 297 /** Limit of count eviction process when start to avoid to cache blocks */ 298 private final int heavyEvictionCountLimit; 299 300 /** Limit of volume eviction process when start to avoid to cache blocks */ 301 private final long heavyEvictionMbSizeLimit; 302 303 /** Adjust auto-scaling via overhead of evition rate */ 304 private final float heavyEvictionOverheadCoefficient; 305 306 /** 307 * Default constructor. Specify maximum size and expected average block size (approximation is 308 * fine). 309 * <p> 310 * All other factors will be calculated based on defaults specified in this class. 311 * @param maxSize maximum size of cache, in bytes 312 * @param blockSize approximate size of each block, in bytes 313 */ 314 public LruAdaptiveBlockCache(long maxSize, long blockSize) { 315 this(maxSize, blockSize, true); 316 } 317 318 /** 319 * Constructor used for testing. Allows disabling of the eviction thread. 320 */ 321 public LruAdaptiveBlockCache(long maxSize, long blockSize, boolean evictionThread) { 322 this(maxSize, blockSize, evictionThread, (int) Math.ceil(1.2 * maxSize / blockSize), 323 DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL, DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR, 324 DEFAULT_SINGLE_FACTOR, DEFAULT_MULTI_FACTOR, DEFAULT_MEMORY_FACTOR, 325 DEFAULT_HARD_CAPACITY_LIMIT_FACTOR, false, DEFAULT_MAX_BLOCK_SIZE, 326 DEFAULT_LRU_CACHE_HEAVY_EVICTION_COUNT_LIMIT, DEFAULT_LRU_CACHE_HEAVY_EVICTION_MB_SIZE_LIMIT, 327 DEFAULT_LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT); 328 } 329 330 public LruAdaptiveBlockCache(long maxSize, long blockSize, boolean evictionThread, 331 Configuration conf) { 332 this(maxSize, blockSize, evictionThread, (int) Math.ceil(1.2 * maxSize / blockSize), 333 DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL, 334 conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR), 335 conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR), 336 conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR), 337 conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR), 338 conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR), 339 conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, DEFAULT_HARD_CAPACITY_LIMIT_FACTOR), 340 conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE), 341 conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE), 342 conf.getInt(LRU_CACHE_HEAVY_EVICTION_COUNT_LIMIT, 343 DEFAULT_LRU_CACHE_HEAVY_EVICTION_COUNT_LIMIT), 344 conf.getLong(LRU_CACHE_HEAVY_EVICTION_MB_SIZE_LIMIT, 345 DEFAULT_LRU_CACHE_HEAVY_EVICTION_MB_SIZE_LIMIT), 346 conf.getFloat(LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT, 347 DEFAULT_LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT)); 348 } 349 350 public LruAdaptiveBlockCache(long maxSize, long blockSize, Configuration conf) { 351 this(maxSize, blockSize, true, conf); 352 } 353 354 /** 355 * Configurable constructor. Use this constructor if not using defaults. 356 * @param maxSize maximum size of this cache, in bytes 357 * @param blockSize expected average size of blocks, in bytes 358 * @param evictionThread whether to run evictions in a bg thread or not 359 * @param mapInitialSize initial size of backing ConcurrentHashMap 360 * @param mapLoadFactor initial load factor of backing ConcurrentHashMap 361 * @param mapConcurrencyLevel initial concurrency factor for backing CHM 362 * @param minFactor percentage of total size that eviction will evict until 363 * @param acceptableFactor percentage of total size that triggers eviction 364 * @param singleFactor percentage of total size for single-access blocks 365 * @param multiFactor percentage of total size for multiple-access blocks 366 * @param memoryFactor percentage of total size for in-memory blocks 367 * @param hardLimitFactor hard capacity limit 368 * @param forceInMemory in-memory hfile's data block has higher priority when 369 * evicting 370 * @param maxBlockSize maximum block size for caching 371 * @param heavyEvictionCountLimit when starts AdaptiveLRU algoritm work 372 * @param heavyEvictionMbSizeLimit how many bytes desirable putting into BlockCache 373 * @param heavyEvictionOverheadCoefficient how aggressive AdaptiveLRU will reduce GC 374 */ 375 public LruAdaptiveBlockCache(long maxSize, long blockSize, boolean evictionThread, 376 int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel, float minFactor, 377 float acceptableFactor, float singleFactor, float multiFactor, float memoryFactor, 378 float hardLimitFactor, boolean forceInMemory, long maxBlockSize, int heavyEvictionCountLimit, 379 long heavyEvictionMbSizeLimit, float heavyEvictionOverheadCoefficient) { 380 this.maxBlockSize = maxBlockSize; 381 if ( 382 singleFactor + multiFactor + memoryFactor != 1 || singleFactor < 0 || multiFactor < 0 383 || memoryFactor < 0 384 ) { 385 throw new IllegalArgumentException( 386 "Single, multi, and memory factors " + " should be non-negative and total 1.0"); 387 } 388 if (minFactor >= acceptableFactor) { 389 throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor"); 390 } 391 if (minFactor >= 1.0f || acceptableFactor >= 1.0f) { 392 throw new IllegalArgumentException("all factors must be < 1"); 393 } 394 this.maxSize = maxSize; 395 this.blockSize = blockSize; 396 this.forceInMemory = forceInMemory; 397 map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel); 398 this.minFactor = minFactor; 399 this.acceptableFactor = acceptableFactor; 400 this.singleFactor = singleFactor; 401 this.multiFactor = multiFactor; 402 this.memoryFactor = memoryFactor; 403 this.stats = new CacheStats(this.getClass().getSimpleName()); 404 this.count = new AtomicLong(0); 405 this.elements = new AtomicLong(0); 406 this.dataBlockElements = new LongAdder(); 407 this.dataBlockSize = new LongAdder(); 408 this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel); 409 this.size = new AtomicLong(this.overhead); 410 this.hardCapacityLimitFactor = hardLimitFactor; 411 if (evictionThread) { 412 this.evictionThread = new EvictionThread(this); 413 this.evictionThread.start(); // FindBugs SC_START_IN_CTOR 414 } else { 415 this.evictionThread = null; 416 } 417 418 // check the bounds 419 this.heavyEvictionCountLimit = Math.max(heavyEvictionCountLimit, 0); 420 this.heavyEvictionMbSizeLimit = Math.max(heavyEvictionMbSizeLimit, 1); 421 this.cacheDataBlockPercent = 100; 422 heavyEvictionOverheadCoefficient = Math.min(heavyEvictionOverheadCoefficient, 1.0f); 423 heavyEvictionOverheadCoefficient = Math.max(heavyEvictionOverheadCoefficient, 0.001f); 424 this.heavyEvictionOverheadCoefficient = heavyEvictionOverheadCoefficient; 425 426 // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log 427 // every five minutes. 428 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD, 429 STAT_THREAD_PERIOD, TimeUnit.SECONDS); 430 } 431 432 @Override 433 public void setVictimCache(BlockCache victimCache) { 434 if (victimHandler != null) { 435 throw new IllegalArgumentException("The victim cache has already been set"); 436 } 437 victimHandler = requireNonNull(victimCache); 438 } 439 440 @Override 441 public void setMaxSize(long maxSize) { 442 this.maxSize = maxSize; 443 if (this.size.get() > acceptableSize() && !evictionInProgress) { 444 runEviction(); 445 } 446 } 447 448 public int getCacheDataBlockPercent() { 449 return cacheDataBlockPercent; 450 } 451 452 /** 453 * The block cached in LruAdaptiveBlockCache will always be an heap block: on the one side, the 454 * heap access will be more faster then off-heap, the small index block or meta block cached in 455 * CombinedBlockCache will benefit a lot. on other side, the LruAdaptiveBlockCache size is always 456 * calculated based on the total heap size, if caching an off-heap block in LruAdaptiveBlockCache, 457 * the heap size will be messed up. Here we will clone the block into an heap block if it's an 458 * off-heap block, otherwise just use the original block. The key point is maintain the refCnt of 459 * the block (HBASE-22127): <br> 460 * 1. if cache the cloned heap block, its refCnt is an totally new one, it's easy to handle; <br> 461 * 2. if cache the original heap block, we're sure that it won't be tracked in ByteBuffAllocator's 462 * reservoir, if both RPC and LruAdaptiveBlockCache release the block, then it can be garbage 463 * collected by JVM, so need a retain here. 464 * @param buf the original block 465 * @return an block with an heap memory backend. 466 */ 467 private Cacheable asReferencedHeapBlock(Cacheable buf) { 468 if (buf instanceof HFileBlock) { 469 HFileBlock blk = ((HFileBlock) buf); 470 if (blk.isSharedMem()) { 471 return HFileBlock.deepCloneOnHeap(blk); 472 } 473 } 474 // The block will be referenced by this LruAdaptiveBlockCache, 475 // so should increase its refCnt here. 476 return buf.retain(); 477 } 478 479 // BlockCache implementation 480 481 /** 482 * Cache the block with the specified name and buffer. 483 * <p> 484 * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547) 485 * this can happen, for which we compare the buffer contents. 486 * @param cacheKey block's cache key 487 * @param buf block buffer 488 * @param inMemory if block is in-memory 489 */ 490 @Override 491 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { 492 493 // Some data blocks will not put into BlockCache when eviction rate too much. 494 // It is good for performance 495 // (see details: https://issues.apache.org/jira/browse/HBASE-23887) 496 // How to calculate it can find inside EvictionThread class. 497 if (cacheDataBlockPercent != 100 && buf.getBlockType().isData()) { 498 // It works like filter - blocks which two last digits of offset 499 // more than we calculate in Eviction Thread will not put into BlockCache 500 if (cacheKey.getOffset() % 100 >= cacheDataBlockPercent) { 501 return; 502 } 503 } 504 505 if (buf.heapSize() > maxBlockSize) { 506 // If there are a lot of blocks that are too 507 // big this can make the logs way too noisy. 508 // So we log 2% 509 if (stats.failInsert() % 50 == 0) { 510 LOG.warn("Trying to cache too large a block " + cacheKey.getHfileName() + " @ " 511 + cacheKey.getOffset() + " is " + buf.heapSize() + " which is larger than " 512 + maxBlockSize); 513 } 514 return; 515 } 516 517 LruCachedBlock cb = map.get(cacheKey); 518 if (cb != null && !BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, buf)) { 519 return; 520 } 521 long currentSize = size.get(); 522 long currentAcceptableSize = acceptableSize(); 523 long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize); 524 if (currentSize >= hardLimitSize) { 525 stats.failInsert(); 526 if (LOG.isTraceEnabled()) { 527 LOG.trace("LruAdaptiveBlockCache current size " + StringUtils.byteDesc(currentSize) 528 + " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + "." 529 + " The hard limit size is " + StringUtils.byteDesc(hardLimitSize) 530 + ", failed to put cacheKey:" + cacheKey + " into LruAdaptiveBlockCache."); 531 } 532 if (!evictionInProgress) { 533 runEviction(); 534 } 535 return; 536 } 537 // Ensure that the block is an heap one. 538 buf = asReferencedHeapBlock(buf); 539 cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory); 540 long newSize = updateSizeMetrics(cb, false); 541 map.put(cacheKey, cb); 542 long val = elements.incrementAndGet(); 543 if (buf.getBlockType().isData()) { 544 dataBlockElements.increment(); 545 } 546 if (LOG.isTraceEnabled()) { 547 long size = map.size(); 548 assertCounterSanity(size, val); 549 } 550 if (newSize > currentAcceptableSize && !evictionInProgress) { 551 runEviction(); 552 } 553 } 554 555 /** 556 * Sanity-checking for parity between actual block cache content and metrics. Intended only for 557 * use with TRACE level logging and -ea JVM. 558 */ 559 private static void assertCounterSanity(long mapSize, long counterVal) { 560 if (counterVal < 0) { 561 LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal 562 + ", mapSize=" + mapSize); 563 return; 564 } 565 if (mapSize < Integer.MAX_VALUE) { 566 double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.); 567 if (pct_diff > 0.05) { 568 LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal 569 + ", mapSize=" + mapSize); 570 } 571 } 572 } 573 574 /** 575 * Cache the block with the specified name and buffer. 576 * <p> 577 * TODO after HBASE-22005, we may cache an block which allocated from off-heap, but our LRU cache 578 * sizing is based on heap size, so we should handle this in HBASE-22127. It will introduce an 579 * switch whether make the LRU on-heap or not, if so we may need copy the memory to on-heap, 580 * otherwise the caching size is based on off-heap. 581 * @param cacheKey block's cache key 582 * @param buf block buffer 583 */ 584 @Override 585 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 586 cacheBlock(cacheKey, buf, false); 587 } 588 589 /** 590 * Helper function that updates the local size counter and also updates any per-cf or 591 * per-blocktype metrics it can discern from given {@link LruCachedBlock} 592 */ 593 private long updateSizeMetrics(LruCachedBlock cb, boolean evict) { 594 long heapsize = cb.heapSize(); 595 BlockType bt = cb.getBuffer().getBlockType(); 596 if (evict) { 597 heapsize *= -1; 598 } 599 if (bt != null && bt.isData()) { 600 dataBlockSize.add(heapsize); 601 } 602 return size.addAndGet(heapsize); 603 } 604 605 /** 606 * Get the buffer of the block with the specified name. 607 * @param cacheKey block's cache key 608 * @param caching true if the caller caches blocks on cache misses 609 * @param repeat Whether this is a repeat lookup for the same block (used to avoid 610 * double counting cache misses when doing double-check locking) 611 * @param updateCacheMetrics Whether to update cache metrics or not 612 * @return buffer of specified cache key, or null if not in cache 613 */ 614 @Override 615 public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, 616 boolean updateCacheMetrics) { 617 LruCachedBlock cb = map.computeIfPresent(cacheKey, (key, val) -> { 618 // It will be referenced by RPC path, so increase here. NOTICE: Must do the retain inside 619 // this block. because if retain outside the map#computeIfPresent, the evictBlock may remove 620 // the block and release, then we're retaining a block with refCnt=0 which is disallowed. 621 // see HBASE-22422. 622 val.getBuffer().retain(); 623 return val; 624 }); 625 if (cb == null) { 626 if (!repeat && updateCacheMetrics) { 627 stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 628 } 629 // If there is another block cache then try and read there. 630 // However if this is a retry ( second time in double checked locking ) 631 // And it's already a miss then the l2 will also be a miss. 632 if (victimHandler != null && !repeat) { 633 // The handler will increase result's refCnt for RPC, so need no extra retain. 634 Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics); 635 // Promote this to L1. 636 if (result != null) { 637 if (caching) { 638 cacheBlock(cacheKey, result, /* inMemory = */ false); 639 } 640 } 641 return result; 642 } 643 return null; 644 } 645 if (updateCacheMetrics) { 646 stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 647 } 648 cb.access(count.incrementAndGet()); 649 return cb.getBuffer(); 650 } 651 652 /** 653 * Whether the cache contains block with specified cacheKey 654 * @return true if contains the block 655 */ 656 @Override 657 public boolean containsBlock(BlockCacheKey cacheKey) { 658 return map.containsKey(cacheKey); 659 } 660 661 @Override 662 public boolean evictBlock(BlockCacheKey cacheKey) { 663 LruCachedBlock cb = map.get(cacheKey); 664 return cb != null && evictBlock(cb, false) > 0; 665 } 666 667 /** 668 * Evicts all blocks for a specific HFile. This is an expensive operation implemented as a 669 * linear-time search through all blocks in the cache. Ideally this should be a search in a 670 * log-access-time map. 671 * <p> 672 * This is used for evict-on-close to remove all blocks of a specific HFile. 673 * @return the number of blocks evicted 674 */ 675 @Override 676 public int evictBlocksByHfileName(String hfileName) { 677 int numEvicted = (int) map.keySet().stream().filter(key -> key.getHfileName().equals(hfileName)) 678 .filter(this::evictBlock).count(); 679 if (victimHandler != null) { 680 numEvicted += victimHandler.evictBlocksByHfileName(hfileName); 681 } 682 return numEvicted; 683 } 684 685 /** 686 * Evict the block, and it will be cached by the victim handler if exists && block may be 687 * read again later 688 * @param evictedByEvictionProcess true if the given block is evicted by EvictionThread 689 * @return the heap size of evicted block 690 */ 691 protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) { 692 LruCachedBlock previous = map.remove(block.getCacheKey()); 693 if (previous == null) { 694 return 0; 695 } 696 updateSizeMetrics(block, true); 697 long val = elements.decrementAndGet(); 698 if (LOG.isTraceEnabled()) { 699 long size = map.size(); 700 assertCounterSanity(size, val); 701 } 702 if (block.getBuffer().getBlockType().isData()) { 703 dataBlockElements.decrement(); 704 } 705 if (evictedByEvictionProcess) { 706 // When the eviction of the block happened because of invalidation of HFiles, no need to 707 // update the stats counter. 708 stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary()); 709 if (victimHandler != null) { 710 victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer()); 711 } 712 } 713 // Decrease the block's reference count, and if refCount is 0, then it'll auto-deallocate. DO 714 // NOT move this up because if do that then the victimHandler may access the buffer with 715 // refCnt = 0 which is disallowed. 716 previous.getBuffer().release(); 717 return block.heapSize(); 718 } 719 720 /** 721 * Multi-threaded call to run the eviction process. 722 */ 723 private void runEviction() { 724 if (evictionThread == null) { 725 evict(); 726 } else { 727 evictionThread.evict(); 728 } 729 } 730 731 boolean isEvictionInProgress() { 732 return evictionInProgress; 733 } 734 735 long getOverhead() { 736 return overhead; 737 } 738 739 /** 740 * Eviction method. Evict items in order of use, allowing delete items which haven't been used for 741 * the longest amount of time. 742 * @return how many bytes were freed 743 */ 744 long evict() { 745 746 // Ensure only one eviction at a time 747 if (!evictionLock.tryLock()) { 748 return 0; 749 } 750 751 long bytesToFree = 0L; 752 753 try { 754 evictionInProgress = true; 755 long currentSize = this.size.get(); 756 bytesToFree = currentSize - minSize(); 757 758 if (LOG.isTraceEnabled()) { 759 LOG.trace("Block cache LRU eviction started; Attempting to free " 760 + StringUtils.byteDesc(bytesToFree) + " of total=" + StringUtils.byteDesc(currentSize)); 761 } 762 763 if (bytesToFree <= 0) { 764 return 0; 765 } 766 767 // Instantiate priority buckets 768 BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize, singleSize()); 769 BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize, multiSize()); 770 BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize, memorySize()); 771 772 // Scan entire map putting into appropriate buckets 773 for (LruCachedBlock cachedBlock : map.values()) { 774 switch (cachedBlock.getPriority()) { 775 case SINGLE: { 776 bucketSingle.add(cachedBlock); 777 break; 778 } 779 case MULTI: { 780 bucketMulti.add(cachedBlock); 781 break; 782 } 783 case MEMORY: { 784 bucketMemory.add(cachedBlock); 785 break; 786 } 787 } 788 } 789 790 long bytesFreed = 0; 791 if (forceInMemory || memoryFactor > 0.999f) { 792 long s = bucketSingle.totalSize(); 793 long m = bucketMulti.totalSize(); 794 if (bytesToFree > (s + m)) { 795 // this means we need to evict blocks in memory bucket to make room, 796 // so the single and multi buckets will be emptied 797 bytesFreed = bucketSingle.free(s); 798 bytesFreed += bucketMulti.free(m); 799 if (LOG.isTraceEnabled()) { 800 LOG.trace( 801 "freed " + StringUtils.byteDesc(bytesFreed) + " from single and multi buckets"); 802 } 803 bytesFreed += bucketMemory.free(bytesToFree - bytesFreed); 804 if (LOG.isTraceEnabled()) { 805 LOG.trace( 806 "freed " + StringUtils.byteDesc(bytesFreed) + " total from all three buckets "); 807 } 808 } else { 809 // this means no need to evict block in memory bucket, 810 // and we try best to make the ratio between single-bucket and 811 // multi-bucket is 1:2 812 long bytesRemain = s + m - bytesToFree; 813 if (3 * s <= bytesRemain) { 814 // single-bucket is small enough that no eviction happens for it 815 // hence all eviction goes from multi-bucket 816 bytesFreed = bucketMulti.free(bytesToFree); 817 } else if (3 * m <= 2 * bytesRemain) { 818 // multi-bucket is small enough that no eviction happens for it 819 // hence all eviction goes from single-bucket 820 bytesFreed = bucketSingle.free(bytesToFree); 821 } else { 822 // both buckets need to evict some blocks 823 bytesFreed = bucketSingle.free(s - bytesRemain / 3); 824 if (bytesFreed < bytesToFree) { 825 bytesFreed += bucketMulti.free(bytesToFree - bytesFreed); 826 } 827 } 828 } 829 } else { 830 PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3); 831 832 bucketQueue.add(bucketSingle); 833 bucketQueue.add(bucketMulti); 834 bucketQueue.add(bucketMemory); 835 836 int remainingBuckets = bucketQueue.size(); 837 838 BlockBucket bucket; 839 while ((bucket = bucketQueue.poll()) != null) { 840 long overflow = bucket.overflow(); 841 if (overflow > 0) { 842 long bucketBytesToFree = 843 Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets); 844 bytesFreed += bucket.free(bucketBytesToFree); 845 } 846 remainingBuckets--; 847 } 848 } 849 if (LOG.isTraceEnabled()) { 850 long single = bucketSingle.totalSize(); 851 long multi = bucketMulti.totalSize(); 852 long memory = bucketMemory.totalSize(); 853 LOG.trace( 854 "Block cache LRU eviction completed; " + "freed=" + StringUtils.byteDesc(bytesFreed) 855 + ", " + "total=" + StringUtils.byteDesc(this.size.get()) + ", " + "single=" 856 + StringUtils.byteDesc(single) + ", " + "multi=" + StringUtils.byteDesc(multi) + ", " 857 + "memory=" + StringUtils.byteDesc(memory)); 858 } 859 } finally { 860 stats.evict(); 861 evictionInProgress = false; 862 evictionLock.unlock(); 863 } 864 return bytesToFree; 865 } 866 867 @Override 868 public String toString() { 869 return MoreObjects.toStringHelper(this).add("blockCount", getBlockCount()) 870 .add("currentSize", StringUtils.byteDesc(getCurrentSize())) 871 .add("freeSize", StringUtils.byteDesc(getFreeSize())) 872 .add("maxSize", StringUtils.byteDesc(getMaxSize())) 873 .add("heapSize", StringUtils.byteDesc(heapSize())) 874 .add("minSize", StringUtils.byteDesc(minSize())).add("minFactor", minFactor) 875 .add("multiSize", StringUtils.byteDesc(multiSize())).add("multiFactor", multiFactor) 876 .add("singleSize", StringUtils.byteDesc(singleSize())).add("singleFactor", singleFactor) 877 .toString(); 878 } 879 880 /** 881 * Used to group blocks into priority buckets. There will be a BlockBucket for each priority 882 * (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate number of 883 * elements out of each according to configuration parameters and their relatives sizes. 884 */ 885 private class BlockBucket implements Comparable<BlockBucket> { 886 887 private final String name; 888 private final LruCachedBlockQueue queue; 889 private long totalSize = 0; 890 private final long bucketSize; 891 892 public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) { 893 this.name = name; 894 this.bucketSize = bucketSize; 895 queue = new LruCachedBlockQueue(bytesToFree, blockSize); 896 totalSize = 0; 897 } 898 899 public void add(LruCachedBlock block) { 900 totalSize += block.heapSize(); 901 queue.add(block); 902 } 903 904 public long free(long toFree) { 905 if (LOG.isTraceEnabled()) { 906 LOG.trace("freeing {} from {}", StringUtils.byteDesc(toFree), this); 907 } 908 LruCachedBlock cb; 909 long freedBytes = 0; 910 while ((cb = queue.pollLast()) != null) { 911 freedBytes += evictBlock(cb, true); 912 if (freedBytes >= toFree) { 913 return freedBytes; 914 } 915 } 916 if (LOG.isTraceEnabled()) { 917 LOG.trace("freeing {} from {}", StringUtils.byteDesc(toFree), this); 918 } 919 return freedBytes; 920 } 921 922 public long overflow() { 923 return totalSize - bucketSize; 924 } 925 926 public long totalSize() { 927 return totalSize; 928 } 929 930 @Override 931 public int compareTo(BlockBucket that) { 932 return Long.compare(this.overflow(), that.overflow()); 933 } 934 935 @Override 936 public boolean equals(Object that) { 937 if (!(that instanceof BlockBucket)) { 938 return false; 939 } 940 return compareTo((BlockBucket) that) == 0; 941 } 942 943 @Override 944 public int hashCode() { 945 return Objects.hashCode(name, bucketSize, queue, totalSize); 946 } 947 948 @Override 949 public String toString() { 950 return MoreObjects.toStringHelper(this).add("name", name) 951 .add("totalSize", StringUtils.byteDesc(totalSize)) 952 .add("bucketSize", StringUtils.byteDesc(bucketSize)).toString(); 953 } 954 } 955 956 /** 957 * Get the maximum size of this cache. 958 * @return max size in bytes 959 */ 960 961 @Override 962 public long getMaxSize() { 963 return this.maxSize; 964 } 965 966 @Override 967 public long getCurrentSize() { 968 return this.size.get(); 969 } 970 971 @Override 972 public long getCurrentDataSize() { 973 return this.dataBlockSize.sum(); 974 } 975 976 @Override 977 public long getFreeSize() { 978 return getMaxSize() - getCurrentSize(); 979 } 980 981 @Override 982 public long size() { 983 return getMaxSize(); 984 } 985 986 @Override 987 public long getBlockCount() { 988 return this.elements.get(); 989 } 990 991 @Override 992 public long getDataBlockCount() { 993 return this.dataBlockElements.sum(); 994 } 995 996 EvictionThread getEvictionThread() { 997 return this.evictionThread; 998 } 999 1000 /* 1001 * Eviction thread. Sits in waiting state until an eviction is triggered when the cache size grows 1002 * above the acceptable level.<p> Thread is triggered into action by {@link 1003 * LruAdaptiveBlockCache#runEviction()} 1004 */ 1005 static class EvictionThread extends Thread { 1006 1007 private WeakReference<LruAdaptiveBlockCache> cache; 1008 private volatile boolean go = true; 1009 // flag set after enter the run method, used for test 1010 private boolean enteringRun = false; 1011 1012 public EvictionThread(LruAdaptiveBlockCache cache) { 1013 super(Thread.currentThread().getName() + ".LruAdaptiveBlockCache.EvictionThread"); 1014 setDaemon(true); 1015 this.cache = new WeakReference<>(cache); 1016 } 1017 1018 @Override 1019 public void run() { 1020 enteringRun = true; 1021 long freedSumMb = 0; 1022 int heavyEvictionCount = 0; 1023 int freedDataOverheadPercent = 0; 1024 long startTime = EnvironmentEdgeManager.currentTime(); 1025 while (this.go) { 1026 synchronized (this) { 1027 try { 1028 this.wait(1000 * 10/* Don't wait for ever */); 1029 } catch (InterruptedException e) { 1030 LOG.warn("Interrupted eviction thread ", e); 1031 Thread.currentThread().interrupt(); 1032 } 1033 } 1034 LruAdaptiveBlockCache cache = this.cache.get(); 1035 if (cache == null) { 1036 break; 1037 } 1038 freedSumMb += cache.evict() / 1024 / 1024; 1039 /* 1040 * Sometimes we are reading more data than can fit into BlockCache and it is the cause a 1041 * high rate of evictions. This in turn leads to heavy Garbage Collector works. So a lot of 1042 * blocks put into BlockCache but never read, but spending a lot of CPU resources. Here we 1043 * will analyze how many bytes were freed and decide decide whether the time has come to 1044 * reduce amount of caching blocks. It help avoid put too many blocks into BlockCache when 1045 * evict() works very active and save CPU for other jobs. More delails: 1046 * https://issues.apache.org/jira/browse/HBASE-23887 1047 */ 1048 1049 // First of all we have to control how much time 1050 // has passed since previuos evict() was launched 1051 // This is should be almost the same time (+/- 10s) 1052 // because we get comparable volumes of freed bytes each time. 1053 // 10s because this is default period to run evict() (see above this.wait) 1054 long stopTime = EnvironmentEdgeManager.currentTime(); 1055 if ((stopTime - startTime) > 1000 * 10 - 1) { 1056 // Here we have to calc what situation we have got. 1057 // We have the limit "hbase.lru.cache.heavy.eviction.bytes.size.limit" 1058 // and can calculte overhead on it. 1059 // We will use this information to decide, 1060 // how to change percent of caching blocks. 1061 freedDataOverheadPercent = 1062 (int) (freedSumMb * 100 / cache.heavyEvictionMbSizeLimit) - 100; 1063 if (freedSumMb > cache.heavyEvictionMbSizeLimit) { 1064 // Now we are in the situation when we are above the limit 1065 // But maybe we are going to ignore it because it will end quite soon 1066 heavyEvictionCount++; 1067 if (heavyEvictionCount > cache.heavyEvictionCountLimit) { 1068 // It is going for a long time and we have to reduce of caching 1069 // blocks now. So we calculate here how many blocks we want to skip. 1070 // It depends on: 1071 // 1. Overhead - if overhead is big we could more aggressive 1072 // reducing amount of caching blocks. 1073 // 2. How fast we want to get the result. If we know that our 1074 // heavy reading for a long time, we don't want to wait and can 1075 // increase the coefficient and get good performance quite soon. 1076 // But if we don't sure we can do it slowly and it could prevent 1077 // premature exit from this mode. So, when the coefficient is 1078 // higher we can get better performance when heavy reading is stable. 1079 // But when reading is changing we can adjust to it and set 1080 // the coefficient to lower value. 1081 int change = 1082 (int) (freedDataOverheadPercent * cache.heavyEvictionOverheadCoefficient); 1083 // But practice shows that 15% of reducing is quite enough. 1084 // We are not greedy (it could lead to premature exit). 1085 change = Math.min(15, change); 1086 change = Math.max(0, change); // I think it will never happen but check for sure 1087 // So this is the key point, here we are reducing % of caching blocks 1088 cache.cacheDataBlockPercent -= change; 1089 // If we go down too deep we have to stop here, 1% any way should be. 1090 cache.cacheDataBlockPercent = Math.max(1, cache.cacheDataBlockPercent); 1091 } 1092 } else { 1093 // Well, we have got overshooting. 1094 // Mayby it is just short-term fluctuation and we can stay in this mode. 1095 // It help avoid permature exit during short-term fluctuation. 1096 // If overshooting less than 90%, we will try to increase the percent of 1097 // caching blocks and hope it is enough. 1098 if (freedSumMb >= cache.heavyEvictionMbSizeLimit * 0.1) { 1099 // Simple logic: more overshooting - more caching blocks (backpressure) 1100 int change = (int) (-freedDataOverheadPercent * 0.1 + 1); 1101 cache.cacheDataBlockPercent += change; 1102 // But it can't be more then 100%, so check it. 1103 cache.cacheDataBlockPercent = Math.min(100, cache.cacheDataBlockPercent); 1104 } else { 1105 // Looks like heavy reading is over. 1106 // Just exit form this mode. 1107 heavyEvictionCount = 0; 1108 cache.cacheDataBlockPercent = 100; 1109 } 1110 } 1111 LOG.info( 1112 "BlockCache evicted (MB): {}, overhead (%): {}, " + "heavy eviction counter: {}, " 1113 + "current caching DataBlock (%): {}", 1114 freedSumMb, freedDataOverheadPercent, heavyEvictionCount, cache.cacheDataBlockPercent); 1115 1116 freedSumMb = 0; 1117 startTime = stopTime; 1118 } 1119 } 1120 } 1121 1122 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY", 1123 justification = "This is what we want") 1124 public void evict() { 1125 synchronized (this) { 1126 this.notifyAll(); 1127 } 1128 } 1129 1130 synchronized void shutdown() { 1131 this.go = false; 1132 this.notifyAll(); 1133 } 1134 1135 /** 1136 * Used for the test. 1137 */ 1138 boolean isEnteringRun() { 1139 return this.enteringRun; 1140 } 1141 } 1142 1143 /* 1144 * Statistics thread. Periodically prints the cache statistics to the log. 1145 */ 1146 static class StatisticsThread extends Thread { 1147 1148 private final LruAdaptiveBlockCache lru; 1149 1150 public StatisticsThread(LruAdaptiveBlockCache lru) { 1151 super("LruAdaptiveBlockCacheStats"); 1152 setDaemon(true); 1153 this.lru = lru; 1154 } 1155 1156 @Override 1157 public void run() { 1158 lru.logStats(); 1159 } 1160 } 1161 1162 public void logStats() { 1163 // Log size 1164 long totalSize = heapSize(); 1165 long freeSize = maxSize - totalSize; 1166 LruAdaptiveBlockCache.LOG 1167 .info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " + "freeSize=" 1168 + StringUtils.byteDesc(freeSize) + ", " + "max=" + StringUtils.byteDesc(this.maxSize) + ", " 1169 + "blockCount=" + getBlockCount() + ", " + "accesses=" + stats.getRequestCount() + ", " 1170 + "hits=" + stats.getHitCount() + ", " + "hitRatio=" 1171 + (stats.getHitCount() == 0 1172 ? "0" 1173 : (StringUtils.formatPercent(stats.getHitRatio(), 2) + ", ")) 1174 + ", " + "cachingAccesses=" + stats.getRequestCachingCount() + ", " + "cachingHits=" 1175 + stats.getHitCachingCount() + ", " + "cachingHitsRatio=" 1176 + (stats.getHitCachingCount() == 0 1177 ? "0," 1178 : (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) 1179 + "evictions=" + stats.getEvictionCount() + ", " + "evicted=" + stats.getEvictedCount() 1180 + ", " + "evictedPerRun=" + stats.evictedPerEviction()); 1181 } 1182 1183 /** 1184 * Get counter statistics for this cache. 1185 * <p> 1186 * Includes: total accesses, hits, misses, evicted blocks, and runs of the eviction processes. 1187 */ 1188 @Override 1189 public CacheStats getStats() { 1190 return this.stats; 1191 } 1192 1193 public final static long CACHE_FIXED_OVERHEAD = 1194 ClassSize.estimateBase(LruAdaptiveBlockCache.class, false); 1195 1196 @Override 1197 public long heapSize() { 1198 return getCurrentSize(); 1199 } 1200 1201 private static long calculateOverhead(long maxSize, long blockSize, int concurrency) { 1202 // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG 1203 return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP 1204 + ((long) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY) 1205 + ((long) concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT); 1206 } 1207 1208 @Override 1209 public Iterator<CachedBlock> iterator() { 1210 final Iterator<LruCachedBlock> iterator = map.values().iterator(); 1211 1212 return new Iterator<CachedBlock>() { 1213 private final long now = System.nanoTime(); 1214 1215 @Override 1216 public boolean hasNext() { 1217 return iterator.hasNext(); 1218 } 1219 1220 @Override 1221 public CachedBlock next() { 1222 final LruCachedBlock b = iterator.next(); 1223 return new CachedBlock() { 1224 @Override 1225 public String toString() { 1226 return BlockCacheUtil.toString(this, now); 1227 } 1228 1229 @Override 1230 public BlockPriority getBlockPriority() { 1231 return b.getPriority(); 1232 } 1233 1234 @Override 1235 public BlockType getBlockType() { 1236 return b.getBuffer().getBlockType(); 1237 } 1238 1239 @Override 1240 public long getOffset() { 1241 return b.getCacheKey().getOffset(); 1242 } 1243 1244 @Override 1245 public long getSize() { 1246 return b.getBuffer().heapSize(); 1247 } 1248 1249 @Override 1250 public long getCachedTime() { 1251 return b.getCachedTime(); 1252 } 1253 1254 @Override 1255 public String getFilename() { 1256 return b.getCacheKey().getHfileName(); 1257 } 1258 1259 @Override 1260 public int compareTo(CachedBlock other) { 1261 int diff = this.getFilename().compareTo(other.getFilename()); 1262 if (diff != 0) { 1263 return diff; 1264 } 1265 diff = Long.compare(this.getOffset(), other.getOffset()); 1266 if (diff != 0) { 1267 return diff; 1268 } 1269 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) { 1270 throw new IllegalStateException(this.getCachedTime() + ", " + other.getCachedTime()); 1271 } 1272 return Long.compare(other.getCachedTime(), this.getCachedTime()); 1273 } 1274 1275 @Override 1276 public int hashCode() { 1277 return b.hashCode(); 1278 } 1279 1280 @Override 1281 public boolean equals(Object obj) { 1282 if (obj instanceof CachedBlock) { 1283 CachedBlock cb = (CachedBlock) obj; 1284 return compareTo(cb) == 0; 1285 } else { 1286 return false; 1287 } 1288 } 1289 }; 1290 } 1291 1292 @Override 1293 public void remove() { 1294 throw new UnsupportedOperationException(); 1295 } 1296 }; 1297 } 1298 1299 // Simple calculators of sizes given factors and maxSize 1300 1301 long acceptableSize() { 1302 return (long) Math.floor(this.maxSize * this.acceptableFactor); 1303 } 1304 1305 private long minSize() { 1306 return (long) Math.floor(this.maxSize * this.minFactor); 1307 } 1308 1309 private long singleSize() { 1310 return (long) Math.floor(this.maxSize * this.singleFactor * this.minFactor); 1311 } 1312 1313 private long multiSize() { 1314 return (long) Math.floor(this.maxSize * this.multiFactor * this.minFactor); 1315 } 1316 1317 private long memorySize() { 1318 return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor); 1319 } 1320 1321 @Override 1322 public void shutdown() { 1323 if (victimHandler != null) { 1324 victimHandler.shutdown(); 1325 } 1326 this.scheduleThreadPool.shutdown(); 1327 for (int i = 0; i < 10; i++) { 1328 if (!this.scheduleThreadPool.isShutdown()) { 1329 try { 1330 Thread.sleep(10); 1331 } catch (InterruptedException e) { 1332 LOG.warn("Interrupted while sleeping"); 1333 Thread.currentThread().interrupt(); 1334 break; 1335 } 1336 } 1337 } 1338 1339 if (!this.scheduleThreadPool.isShutdown()) { 1340 List<Runnable> runnables = this.scheduleThreadPool.shutdownNow(); 1341 LOG.debug("Still running " + runnables); 1342 } 1343 this.evictionThread.shutdown(); 1344 } 1345 1346 /** Clears the cache. Used in tests. */ 1347 public void clearCache() { 1348 this.map.clear(); 1349 this.elements.set(0); 1350 } 1351 1352 public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() { 1353 Map<DataBlockEncoding, Integer> counts = new EnumMap<>(DataBlockEncoding.class); 1354 for (LruCachedBlock block : map.values()) { 1355 DataBlockEncoding encoding = ((HFileBlock) block.getBuffer()).getDataBlockEncoding(); 1356 Integer count = counts.get(encoding); 1357 counts.put(encoding, (count == null ? 0 : count) + 1); 1358 } 1359 return counts; 1360 } 1361 1362 Map<BlockCacheKey, LruCachedBlock> getMapForTests() { 1363 return map; 1364 } 1365 1366 @Override 1367 public BlockCache[] getBlockCaches() { 1368 if (victimHandler != null) { 1369 return new BlockCache[] { this, this.victimHandler }; 1370 } 1371 return null; 1372 } 1373}