001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static java.util.Objects.requireNonNull; 021 022import java.util.Comparator; 023import java.util.Iterator; 024import java.util.concurrent.Executor; 025import java.util.concurrent.Executors; 026import java.util.concurrent.ScheduledExecutorService; 027import java.util.concurrent.TimeUnit; 028 029import com.github.benmanes.caffeine.cache.Cache; 030import com.github.benmanes.caffeine.cache.Caffeine; 031import com.github.benmanes.caffeine.cache.Policy.Eviction; 032import com.github.benmanes.caffeine.cache.RemovalCause; 033import com.github.benmanes.caffeine.cache.RemovalListener; 034 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.io.HeapSize; 037import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 038import org.apache.hadoop.util.StringUtils; 039import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 040import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 041import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 042import org.apache.yetus.audience.InterfaceAudience; 043 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047/** 048 * A block cache that is memory-aware using {@link HeapSize}, memory bounded using the W-TinyLFU 049 * eviction algorithm, and concurrent. This implementation delegates to a Caffeine cache to provide 050 * O(1) read and write operations. 051 * <ul> 052 * <li>W-TinyLFU: http://arxiv.org/pdf/1512.00727.pdf</li> 053 * <li>Caffeine: https://github.com/ben-manes/caffeine</li> 054 * <li>Cache design: http://highscalability.com/blog/2016/1/25/design-of-a-modern-cache.html</li> 055 * </ul> 056 */ 057@InterfaceAudience.Private 058public final class TinyLfuBlockCache implements FirstLevelBlockCache { 059 private static final Logger LOG = LoggerFactory.getLogger(TinyLfuBlockCache.class); 060 061 private static final String MAX_BLOCK_SIZE = "hbase.tinylfu.max.block.size"; 062 private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L; 063 private static final int STAT_THREAD_PERIOD_SECONDS = 5 * 60; 064 065 private final Eviction<BlockCacheKey, Cacheable> policy; 066 private final ScheduledExecutorService statsThreadPool; 067 private final long maxBlockSize; 068 private final CacheStats stats; 069 070 private BlockCache victimCache; 071 072 @VisibleForTesting 073 final Cache<BlockCacheKey, Cacheable> cache; 074 075 /** 076 * Creates a block cache. 077 * 078 * @param maximumSizeInBytes maximum size of this cache, in bytes 079 * @param avgBlockSize expected average size of blocks, in bytes 080 * @param executor the cache's executor 081 * @param conf additional configuration 082 */ 083 public TinyLfuBlockCache(long maximumSizeInBytes, long avgBlockSize, 084 Executor executor, Configuration conf) { 085 this(maximumSizeInBytes, avgBlockSize, 086 conf.getLong(MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE), executor); 087 } 088 089 /** 090 * Creates a block cache. 091 * 092 * @param maximumSizeInBytes maximum size of this cache, in bytes 093 * @param avgBlockSize expected average size of blocks, in bytes 094 * @param maxBlockSize maximum size of a block, in bytes 095 * @param executor the cache's executor 096 */ 097 public TinyLfuBlockCache(long maximumSizeInBytes, 098 long avgBlockSize, long maxBlockSize, Executor executor) { 099 this.cache = Caffeine.newBuilder() 100 .executor(executor) 101 .maximumWeight(maximumSizeInBytes) 102 .removalListener(new EvictionListener()) 103 .weigher((BlockCacheKey key, Cacheable value) -> 104 (int) Math.min(value.heapSize(), Integer.MAX_VALUE)) 105 .initialCapacity((int) Math.ceil((1.2 * maximumSizeInBytes) / avgBlockSize)) 106 .build(); 107 this.maxBlockSize = maxBlockSize; 108 this.policy = cache.policy().eviction().get(); 109 this.stats = new CacheStats(getClass().getSimpleName()); 110 111 statsThreadPool = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() 112 .setNameFormat("TinyLfuBlockCacheStatsExecutor").setDaemon(true).build()); 113 statsThreadPool.scheduleAtFixedRate(this::logStats, 114 STAT_THREAD_PERIOD_SECONDS, STAT_THREAD_PERIOD_SECONDS, TimeUnit.SECONDS); 115 } 116 117 @Override 118 public void setVictimCache(BlockCache victimCache) { 119 if (this.victimCache != null) { 120 throw new IllegalArgumentException("The victim cache has already been set"); 121 } 122 this.victimCache = requireNonNull(victimCache); 123 } 124 125 @Override 126 public long size() { 127 return policy.getMaximum(); 128 } 129 130 @Override 131 public long getFreeSize() { 132 return size() - getCurrentSize(); 133 } 134 135 @Override 136 public long getCurrentSize() { 137 return policy.weightedSize().getAsLong(); 138 } 139 140 @Override 141 public long getBlockCount() { 142 return cache.estimatedSize(); 143 } 144 145 @Override 146 public long heapSize() { 147 return getCurrentSize(); 148 } 149 150 @Override 151 public void setMaxSize(long size) { 152 policy.setMaximum(size); 153 } 154 155 @Override 156 public boolean containsBlock(BlockCacheKey cacheKey) { 157 return cache.asMap().containsKey(cacheKey); 158 } 159 160 @Override 161 public Cacheable getBlock(BlockCacheKey cacheKey, 162 boolean caching, boolean repeat, boolean updateCacheMetrics) { 163 Cacheable value = cache.getIfPresent(cacheKey); 164 if (value == null) { 165 if (repeat) { 166 return null; 167 } 168 if (updateCacheMetrics) { 169 stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 170 } 171 if (victimCache != null) { 172 value = victimCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics); 173 if ((value != null) && caching) { 174 if ((value instanceof HFileBlock) && ((HFileBlock) value).isSharedMem()) { 175 value = HFileBlock.deepCloneOnHeap((HFileBlock) value); 176 } 177 cacheBlock(cacheKey, value); 178 } 179 } 180 } else if (updateCacheMetrics) { 181 stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 182 } 183 return value; 184 } 185 186 @Override 187 public void cacheBlock(BlockCacheKey cacheKey, Cacheable value, boolean inMemory) { 188 cacheBlock(cacheKey, value); 189 } 190 191 @Override 192 public void cacheBlock(BlockCacheKey key, Cacheable value) { 193 if (value.heapSize() > maxBlockSize) { 194 // If there are a lot of blocks that are too big this can make the logs too noisy (2% logged) 195 if (stats.failInsert() % 50 == 0) { 196 LOG.warn(String.format( 197 "Trying to cache too large a block %s @ %,d is %,d which is larger than %,d", 198 key.getHfileName(), key.getOffset(), value.heapSize(), DEFAULT_MAX_BLOCK_SIZE)); 199 } 200 } else { 201 cache.put(key, value); 202 } 203 } 204 205 @Override 206 public boolean evictBlock(BlockCacheKey cacheKey) { 207 Cacheable value = cache.asMap().remove(cacheKey); 208 return (value != null); 209 } 210 211 @Override 212 public int evictBlocksByHfileName(String hfileName) { 213 int evicted = 0; 214 for (BlockCacheKey key : cache.asMap().keySet()) { 215 if (key.getHfileName().equals(hfileName) && evictBlock(key)) { 216 evicted++; 217 } 218 } 219 if (victimCache != null) { 220 evicted += victimCache.evictBlocksByHfileName(hfileName); 221 } 222 return evicted; 223 } 224 225 @Override 226 public CacheStats getStats() { 227 return stats; 228 } 229 230 @Override 231 public void shutdown() { 232 if (victimCache != null) { 233 victimCache.shutdown(); 234 } 235 statsThreadPool.shutdown(); 236 } 237 238 @Override 239 public BlockCache[] getBlockCaches() { 240 return null; 241 } 242 243 @Override 244 public Iterator<CachedBlock> iterator() { 245 long now = System.nanoTime(); 246 return cache.asMap().entrySet().stream() 247 .map(entry -> (CachedBlock) new CachedBlockView(entry.getKey(), entry.getValue(), now)) 248 .iterator(); 249 } 250 251 private void logStats() { 252 LOG.info( 253 "totalSize=" + StringUtils.byteDesc(heapSize()) + ", " + 254 "freeSize=" + StringUtils.byteDesc(getFreeSize()) + ", " + 255 "max=" + StringUtils.byteDesc(size()) + ", " + 256 "blockCount=" + getBlockCount() + ", " + 257 "accesses=" + stats.getRequestCount() + ", " + 258 "hits=" + stats.getHitCount() + ", " + 259 "hitRatio=" + (stats.getHitCount() == 0 ? 260 "0," : StringUtils.formatPercent(stats.getHitRatio(), 2) + ", ") + 261 "cachingAccesses=" + stats.getRequestCachingCount() + ", " + 262 "cachingHits=" + stats.getHitCachingCount() + ", " + 263 "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ? 264 "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) + 265 "evictions=" + stats.getEvictionCount() + ", " + 266 "evicted=" + stats.getEvictedCount()); 267 } 268 269 @Override 270 public String toString() { 271 return MoreObjects.toStringHelper(this) 272 .add("blockCount", getBlockCount()) 273 .add("currentSize", getCurrentSize()) 274 .add("freeSize", getFreeSize()) 275 .add("maxSize", size()) 276 .add("heapSize", heapSize()) 277 .add("victimCache", (victimCache != null)) 278 .toString(); 279 } 280 281 /** A removal listener to asynchronously record evictions and populate the victim cache. */ 282 private final class EvictionListener implements RemovalListener<BlockCacheKey, Cacheable> { 283 284 @Override 285 public void onRemoval(BlockCacheKey key, Cacheable value, RemovalCause cause) { 286 if (!cause.wasEvicted()) { 287 // An explicit eviction (invalidation) is not added to the victim cache as the data may 288 // no longer be valid for subsequent queries. 289 return; 290 } 291 292 recordEviction(); 293 294 if (victimCache == null) { 295 return; 296 } else if (victimCache instanceof BucketCache) { 297 BucketCache victimBucketCache = (BucketCache) victimCache; 298 victimBucketCache.cacheBlockWithWait(key, value, /* inMemory */ true, /* wait */ true); 299 } else { 300 victimCache.cacheBlock(key, value); 301 } 302 } 303 } 304 305 /** 306 * Records an eviction. The number of eviction operations and evicted blocks are identical, as 307 * an eviction is triggered immediately when the capacity has been exceeded. An eviction is 308 * performed asynchronously. See the library's documentation for details on write buffers, 309 * batching, and maintenance behavior. 310 */ 311 private void recordEviction() { 312 // FIXME: Currently does not capture the insertion time 313 stats.evicted(Long.MAX_VALUE, true); 314 stats.evict(); 315 } 316 317 private static final class CachedBlockView implements CachedBlock { 318 private static final Comparator<CachedBlock> COMPARATOR = Comparator 319 .comparing(CachedBlock::getFilename) 320 .thenComparing(CachedBlock::getOffset) 321 .thenComparing(CachedBlock::getCachedTime); 322 323 private final BlockCacheKey key; 324 private final Cacheable value; 325 private final long now; 326 327 public CachedBlockView(BlockCacheKey key, Cacheable value, long now) { 328 this.now = now; 329 this.key = key; 330 this.value = value; 331 } 332 333 @Override 334 public BlockPriority getBlockPriority() { 335 // This does not appear to be used in any meaningful way and is irrelevant to this cache 336 return BlockPriority.MEMORY; 337 } 338 339 @Override 340 public BlockType getBlockType() { 341 return value.getBlockType(); 342 } 343 344 @Override 345 public long getOffset() { 346 return key.getOffset(); 347 } 348 349 @Override 350 public long getSize() { 351 return value.heapSize(); 352 } 353 354 @Override 355 public long getCachedTime() { 356 // This does not appear to be used in any meaningful way, so not captured 357 return 0L; 358 } 359 360 @Override 361 public String getFilename() { 362 return key.getHfileName(); 363 } 364 365 @Override 366 public int compareTo(CachedBlock other) { 367 return COMPARATOR.compare(this, other); 368 } 369 370 @Override 371 public boolean equals(Object obj) { 372 if (obj == this) { 373 return true; 374 } else if (!(obj instanceof CachedBlock)) { 375 return false; 376 } 377 CachedBlock other = (CachedBlock) obj; 378 return compareTo(other) == 0; 379 } 380 381 @Override 382 public int hashCode() { 383 return key.hashCode(); 384 } 385 386 @Override 387 public String toString() { 388 return BlockCacheUtil.toString(this, now); 389 } 390 } 391 392 @Override 393 public long getMaxSize() { 394 return size(); 395 } 396 397 @Override 398 public long getCurrentDataSize() { 399 return getCurrentSize(); 400 } 401 402 @Override 403 public long getDataBlockCount() { 404 return getBlockCount(); 405 } 406}