001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static java.util.Objects.requireNonNull; 021 022import com.github.benmanes.caffeine.cache.Cache; 023import com.github.benmanes.caffeine.cache.Caffeine; 024import com.github.benmanes.caffeine.cache.Policy.Eviction; 025import com.github.benmanes.caffeine.cache.RemovalCause; 026import com.github.benmanes.caffeine.cache.RemovalListener; 027import java.util.Comparator; 028import java.util.Iterator; 029import java.util.concurrent.Executor; 030import java.util.concurrent.Executors; 031import java.util.concurrent.ScheduledExecutorService; 032import java.util.concurrent.TimeUnit; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.io.HeapSize; 035import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 036import org.apache.hadoop.util.StringUtils; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 042import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 043 044/** 045 * A block cache that is memory-aware using {@link HeapSize}, memory bounded using the W-TinyLFU 046 * eviction algorithm, and concurrent. This implementation delegates to a Caffeine cache to provide 047 * O(1) read and write operations. 048 * <ul> 049 * <li>W-TinyLFU: http://arxiv.org/pdf/1512.00727.pdf</li> 050 * <li>Caffeine: https://github.com/ben-manes/caffeine</li> 051 * <li>Cache design: http://highscalability.com/blog/2016/1/25/design-of-a-modern-cache.html</li> 052 * </ul> 053 */ 054@InterfaceAudience.Private 055public final class TinyLfuBlockCache implements FirstLevelBlockCache { 056 private static final Logger LOG = LoggerFactory.getLogger(TinyLfuBlockCache.class); 057 058 private static final String MAX_BLOCK_SIZE = "hbase.tinylfu.max.block.size"; 059 private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L; 060 private static final int STAT_THREAD_PERIOD_SECONDS = 5 * 60; 061 062 private transient final Eviction<BlockCacheKey, Cacheable> policy; 063 private transient final ScheduledExecutorService statsThreadPool; 064 private final long maxBlockSize; 065 private final CacheStats stats; 066 067 private transient BlockCache victimCache; 068 069 transient final Cache<BlockCacheKey, Cacheable> cache; 070 071 /** 072 * Creates a block cache. 073 * @param maximumSizeInBytes maximum size of this cache, in bytes 074 * @param avgBlockSize expected average size of blocks, in bytes 075 * @param executor the cache's executor 076 * @param conf additional configuration 077 */ 078 public TinyLfuBlockCache(long maximumSizeInBytes, long avgBlockSize, Executor executor, 079 Configuration conf) { 080 this(maximumSizeInBytes, avgBlockSize, conf.getLong(MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE), 081 executor); 082 } 083 084 /** 085 * Creates a block cache. 086 * @param maximumSizeInBytes maximum size of this cache, in bytes 087 * @param avgBlockSize expected average size of blocks, in bytes 088 * @param maxBlockSize maximum size of a block, in bytes 089 * @param executor the cache's executor 090 */ 091 public TinyLfuBlockCache(long maximumSizeInBytes, long avgBlockSize, long maxBlockSize, 092 Executor executor) { 093 this.cache = Caffeine.newBuilder().executor(executor).maximumWeight(maximumSizeInBytes) 094 .removalListener(new EvictionListener()) 095 .weigher( 096 (BlockCacheKey key, Cacheable value) -> (int) Math.min(value.heapSize(), Integer.MAX_VALUE)) 097 .initialCapacity((int) Math.ceil((1.2 * maximumSizeInBytes) / avgBlockSize)).build(); 098 this.maxBlockSize = maxBlockSize; 099 this.policy = cache.policy().eviction().get(); 100 this.stats = new CacheStats(getClass().getSimpleName()); 101 102 statsThreadPool = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() 103 .setNameFormat("TinyLfuBlockCacheStatsExecutor").setDaemon(true).build()); 104 statsThreadPool.scheduleAtFixedRate(this::logStats, STAT_THREAD_PERIOD_SECONDS, 105 STAT_THREAD_PERIOD_SECONDS, TimeUnit.SECONDS); 106 } 107 108 @Override 109 public void setVictimCache(BlockCache victimCache) { 110 if (this.victimCache != null) { 111 throw new IllegalArgumentException("The victim cache has already been set"); 112 } 113 this.victimCache = requireNonNull(victimCache); 114 } 115 116 @Override 117 public long size() { 118 return policy.getMaximum(); 119 } 120 121 @Override 122 public long getFreeSize() { 123 return size() - getCurrentSize(); 124 } 125 126 @Override 127 public long getCurrentSize() { 128 return policy.weightedSize().getAsLong(); 129 } 130 131 @Override 132 public long getBlockCount() { 133 return cache.estimatedSize(); 134 } 135 136 @Override 137 public long heapSize() { 138 return getCurrentSize(); 139 } 140 141 @Override 142 public void setMaxSize(long size) { 143 policy.setMaximum(size); 144 } 145 146 @Override 147 public boolean containsBlock(BlockCacheKey cacheKey) { 148 return cache.asMap().containsKey(cacheKey); 149 } 150 151 @Override 152 public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, 153 boolean updateCacheMetrics) { 154 Cacheable value = cache.asMap().computeIfPresent(cacheKey, (blockCacheKey, cacheable) -> { 155 // It will be referenced by RPC path, so increase here. NOTICE: Must do the retain inside 156 // this block. because if retain outside the map#computeIfPresent, the evictBlock may remove 157 // the block and release, then we're retaining a block with refCnt=0 which is disallowed. 158 cacheable.retain(); 159 return cacheable; 160 }); 161 if (value == null) { 162 if (repeat) { 163 return null; 164 } 165 if (updateCacheMetrics) { 166 stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 167 } 168 if (victimCache != null) { 169 value = victimCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics); 170 if ((value != null) && caching) { 171 cacheBlock(cacheKey, value); 172 } 173 } 174 } else if (updateCacheMetrics) { 175 stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 176 } 177 return value; 178 } 179 180 @Override 181 public void cacheBlock(BlockCacheKey cacheKey, Cacheable value, boolean inMemory) { 182 cacheBlock(cacheKey, value); 183 } 184 185 @Override 186 public void cacheBlock(BlockCacheKey key, Cacheable value) { 187 if (value.heapSize() > maxBlockSize) { 188 // If there are a lot of blocks that are too big this can make the logs too noisy (2% logged) 189 if (stats.failInsert() % 50 == 0) { 190 LOG.warn(String.format( 191 "Trying to cache too large a block %s @ %,d is %,d which is larger than %,d", 192 key.getHfileName(), key.getOffset(), value.heapSize(), DEFAULT_MAX_BLOCK_SIZE)); 193 } 194 } else { 195 value = asReferencedHeapBlock(value); 196 cache.put(key, value); 197 } 198 } 199 200 /** 201 * The block cached in TinyLfuBlockCache will always be an heap block: on the one side, the heap 202 * access will be more faster then off-heap, the small index block or meta block cached in 203 * CombinedBlockCache will benefit a lot. on other side, the TinyLfuBlockCache size is always 204 * calculated based on the total heap size, if caching an off-heap block in TinyLfuBlockCache, the 205 * heap size will be messed up. Here we will clone the block into an heap block if it's an 206 * off-heap block, otherwise just use the original block. The key point is maintain the refCnt of 207 * the block (HBASE-22127): <br> 208 * 1. if cache the cloned heap block, its refCnt is an totally new one, it's easy to handle; <br> 209 * 2. if cache the original heap block, we're sure that it won't be tracked in ByteBuffAllocator's 210 * reservoir, if both RPC and TinyLfuBlockCache release the block, then it can be garbage 211 * collected by JVM, so need a retain here. 212 * @param buf the original block 213 * @return an block with an heap memory backend. 214 */ 215 private Cacheable asReferencedHeapBlock(Cacheable buf) { 216 if (buf instanceof HFileBlock) { 217 HFileBlock blk = ((HFileBlock) buf); 218 if (blk.isSharedMem()) { 219 return HFileBlock.deepCloneOnHeap(blk); 220 } 221 } 222 // The block will be referenced by this TinyLfuBlockCache, so should increase its refCnt here. 223 return buf.retain(); 224 } 225 226 @Override 227 public boolean evictBlock(BlockCacheKey cacheKey) { 228 Cacheable value = cache.asMap().remove(cacheKey); 229 if (value != null) { 230 value.release(); 231 } 232 return (value != null); 233 } 234 235 @Override 236 public int evictBlocksByHfileName(String hfileName) { 237 int evicted = 0; 238 for (BlockCacheKey key : cache.asMap().keySet()) { 239 if (key.getHfileName().equals(hfileName) && evictBlock(key)) { 240 evicted++; 241 } 242 } 243 if (victimCache != null) { 244 evicted += victimCache.evictBlocksByHfileName(hfileName); 245 } 246 return evicted; 247 } 248 249 @Override 250 public CacheStats getStats() { 251 return stats; 252 } 253 254 @Override 255 public void shutdown() { 256 if (victimCache != null) { 257 victimCache.shutdown(); 258 } 259 statsThreadPool.shutdown(); 260 } 261 262 @Override 263 public BlockCache[] getBlockCaches() { 264 return null; 265 } 266 267 @Override 268 public Iterator<CachedBlock> iterator() { 269 long now = System.nanoTime(); 270 return cache.asMap().entrySet().stream() 271 .map(entry -> (CachedBlock) new CachedBlockView(entry.getKey(), entry.getValue(), now)) 272 .iterator(); 273 } 274 275 private void logStats() { 276 LOG.info("totalSize=" + StringUtils.byteDesc(heapSize()) + ", " + "freeSize=" 277 + StringUtils.byteDesc(getFreeSize()) + ", " + "max=" + StringUtils.byteDesc(size()) + ", " 278 + "blockCount=" + getBlockCount() + ", " + "accesses=" + stats.getRequestCount() + ", " 279 + "hits=" + stats.getHitCount() + ", " + "hitRatio=" 280 + (stats.getHitCount() == 0 ? "0," : StringUtils.formatPercent(stats.getHitRatio(), 2) + ", ") 281 + "cachingAccesses=" + stats.getRequestCachingCount() + ", " + "cachingHits=" 282 + stats.getHitCachingCount() + ", " + "cachingHitsRatio=" 283 + (stats.getHitCachingCount() == 0 284 ? "0," 285 : (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) 286 + "evictions=" + stats.getEvictionCount() + ", " + "evicted=" + stats.getEvictedCount()); 287 } 288 289 @Override 290 public String toString() { 291 return MoreObjects.toStringHelper(this).add("blockCount", getBlockCount()) 292 .add("currentSize", getCurrentSize()).add("freeSize", getFreeSize()).add("maxSize", size()) 293 .add("heapSize", heapSize()).add("victimCache", (victimCache != null)).toString(); 294 } 295 296 /** A removal listener to asynchronously record evictions and populate the victim cache. */ 297 private final class EvictionListener implements RemovalListener<BlockCacheKey, Cacheable> { 298 299 @Override 300 public void onRemoval(BlockCacheKey key, Cacheable value, RemovalCause cause) { 301 if (!cause.wasEvicted()) { 302 // An explicit eviction (invalidation) is not added to the victim cache as the data may 303 // no longer be valid for subsequent queries. 304 return; 305 } 306 307 recordEviction(); 308 309 if (victimCache == null) { 310 return; 311 } else if (victimCache instanceof BucketCache) { 312 BucketCache victimBucketCache = (BucketCache) victimCache; 313 victimBucketCache.cacheBlockWithWait(key, value, /* inMemory */ true, /* wait */ true); 314 } else { 315 victimCache.cacheBlock(key, value); 316 } 317 } 318 } 319 320 /** 321 * Records an eviction. The number of eviction operations and evicted blocks are identical, as an 322 * eviction is triggered immediately when the capacity has been exceeded. An eviction is performed 323 * asynchronously. See the library's documentation for details on write buffers, batching, and 324 * maintenance behavior. 325 */ 326 private void recordEviction() { 327 // FIXME: Currently does not capture the insertion time 328 stats.evicted(Long.MAX_VALUE, true); 329 stats.evict(); 330 } 331 332 private static final class CachedBlockView implements CachedBlock { 333 private static final Comparator<CachedBlock> COMPARATOR = 334 Comparator.comparing(CachedBlock::getFilename).thenComparing(CachedBlock::getOffset) 335 .thenComparing(CachedBlock::getCachedTime); 336 337 private final BlockCacheKey key; 338 private final Cacheable value; 339 private final long now; 340 341 public CachedBlockView(BlockCacheKey key, Cacheable value, long now) { 342 this.now = now; 343 this.key = key; 344 this.value = value; 345 } 346 347 @Override 348 public BlockPriority getBlockPriority() { 349 // This does not appear to be used in any meaningful way and is irrelevant to this cache 350 return BlockPriority.MEMORY; 351 } 352 353 @Override 354 public BlockType getBlockType() { 355 return value.getBlockType(); 356 } 357 358 @Override 359 public long getOffset() { 360 return key.getOffset(); 361 } 362 363 @Override 364 public long getSize() { 365 return value.heapSize(); 366 } 367 368 @Override 369 public long getCachedTime() { 370 // This does not appear to be used in any meaningful way, so not captured 371 return 0L; 372 } 373 374 @Override 375 public String getFilename() { 376 return key.getHfileName(); 377 } 378 379 @Override 380 public int compareTo(CachedBlock other) { 381 return COMPARATOR.compare(this, other); 382 } 383 384 @Override 385 public boolean equals(Object obj) { 386 if (obj == this) { 387 return true; 388 } else if (!(obj instanceof CachedBlock)) { 389 return false; 390 } 391 CachedBlock other = (CachedBlock) obj; 392 return compareTo(other) == 0; 393 } 394 395 @Override 396 public int hashCode() { 397 return key.hashCode(); 398 } 399 400 @Override 401 public String toString() { 402 return BlockCacheUtil.toString(this, now); 403 } 404 } 405 406 @Override 407 public long getMaxSize() { 408 return size(); 409 } 410 411 @Override 412 public long getCurrentDataSize() { 413 return getCurrentSize(); 414 } 415 416 @Override 417 public long getDataBlockCount() { 418 return getBlockCount(); 419 } 420}