001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static java.util.Objects.requireNonNull; 021 022import java.util.Comparator; 023import java.util.Iterator; 024import java.util.concurrent.Executor; 025import java.util.concurrent.Executors; 026import java.util.concurrent.ScheduledExecutorService; 027import java.util.concurrent.TimeUnit; 028 029import com.github.benmanes.caffeine.cache.Cache; 030import com.github.benmanes.caffeine.cache.Caffeine; 031import com.github.benmanes.caffeine.cache.Policy.Eviction; 032import com.github.benmanes.caffeine.cache.RemovalCause; 033import com.github.benmanes.caffeine.cache.RemovalListener; 034 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.io.HeapSize; 037import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 038import org.apache.hadoop.util.StringUtils; 039import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 040import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 041import org.apache.yetus.audience.InterfaceAudience; 042 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * A block cache that is memory-aware using {@link HeapSize}, memory bounded using the W-TinyLFU 048 * eviction algorithm, and concurrent. This implementation delegates to a Caffeine cache to provide 049 * O(1) read and write operations. 050 * <ul> 051 * <li>W-TinyLFU: http://arxiv.org/pdf/1512.00727.pdf</li> 052 * <li>Caffeine: https://github.com/ben-manes/caffeine</li> 053 * <li>Cache design: http://highscalability.com/blog/2016/1/25/design-of-a-modern-cache.html</li> 054 * </ul> 055 */ 056@InterfaceAudience.Private 057public final class TinyLfuBlockCache implements FirstLevelBlockCache { 058 private static final Logger LOG = LoggerFactory.getLogger(TinyLfuBlockCache.class); 059 060 private static final String MAX_BLOCK_SIZE = "hbase.tinylfu.max.block.size"; 061 private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L; 062 private static final int STAT_THREAD_PERIOD_SECONDS = 5 * 60; 063 064 private final Eviction<BlockCacheKey, Cacheable> policy; 065 private final ScheduledExecutorService statsThreadPool; 066 private final long maxBlockSize; 067 private final CacheStats stats; 068 069 private BlockCache victimCache; 070 071 final Cache<BlockCacheKey, Cacheable> cache; 072 073 /** 074 * Creates a block cache. 075 * 076 * @param maximumSizeInBytes maximum size of this cache, in bytes 077 * @param avgBlockSize expected average size of blocks, in bytes 078 * @param executor the cache's executor 079 * @param conf additional configuration 080 */ 081 public TinyLfuBlockCache(long maximumSizeInBytes, long avgBlockSize, 082 Executor executor, Configuration conf) { 083 this(maximumSizeInBytes, avgBlockSize, 084 conf.getLong(MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE), executor); 085 } 086 087 /** 088 * Creates a block cache. 089 * 090 * @param maximumSizeInBytes maximum size of this cache, in bytes 091 * @param avgBlockSize expected average size of blocks, in bytes 092 * @param maxBlockSize maximum size of a block, in bytes 093 * @param executor the cache's executor 094 */ 095 public TinyLfuBlockCache(long maximumSizeInBytes, 096 long avgBlockSize, long maxBlockSize, Executor executor) { 097 this.cache = Caffeine.newBuilder() 098 .executor(executor) 099 .maximumWeight(maximumSizeInBytes) 100 .removalListener(new EvictionListener()) 101 .weigher((BlockCacheKey key, Cacheable value) -> 102 (int) Math.min(value.heapSize(), Integer.MAX_VALUE)) 103 .initialCapacity((int) Math.ceil((1.2 * maximumSizeInBytes) / avgBlockSize)) 104 .build(); 105 this.maxBlockSize = maxBlockSize; 106 this.policy = cache.policy().eviction().get(); 107 this.stats = new CacheStats(getClass().getSimpleName()); 108 109 statsThreadPool = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() 110 .setNameFormat("TinyLfuBlockCacheStatsExecutor").setDaemon(true).build()); 111 statsThreadPool.scheduleAtFixedRate(this::logStats, 112 STAT_THREAD_PERIOD_SECONDS, STAT_THREAD_PERIOD_SECONDS, TimeUnit.SECONDS); 113 } 114 115 @Override 116 public void setVictimCache(BlockCache victimCache) { 117 if (this.victimCache != null) { 118 throw new IllegalArgumentException("The victim cache has already been set"); 119 } 120 this.victimCache = requireNonNull(victimCache); 121 } 122 123 @Override 124 public long size() { 125 return policy.getMaximum(); 126 } 127 128 @Override 129 public long getFreeSize() { 130 return size() - getCurrentSize(); 131 } 132 133 @Override 134 public long getCurrentSize() { 135 return policy.weightedSize().getAsLong(); 136 } 137 138 @Override 139 public long getBlockCount() { 140 return cache.estimatedSize(); 141 } 142 143 @Override 144 public long heapSize() { 145 return getCurrentSize(); 146 } 147 148 @Override 149 public void setMaxSize(long size) { 150 policy.setMaximum(size); 151 } 152 153 @Override 154 public boolean containsBlock(BlockCacheKey cacheKey) { 155 return cache.asMap().containsKey(cacheKey); 156 } 157 158 @Override 159 public Cacheable getBlock(BlockCacheKey cacheKey, 160 boolean caching, boolean repeat, boolean updateCacheMetrics) { 161 Cacheable value = cache.getIfPresent(cacheKey); 162 if (value == null) { 163 if (repeat) { 164 return null; 165 } 166 if (updateCacheMetrics) { 167 stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 168 } 169 if (victimCache != null) { 170 value = victimCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics); 171 if ((value != null) && caching) { 172 if ((value instanceof HFileBlock) && ((HFileBlock) value).isSharedMem()) { 173 value = HFileBlock.deepCloneOnHeap((HFileBlock) value); 174 } 175 cacheBlock(cacheKey, value); 176 } 177 } 178 } else if (updateCacheMetrics) { 179 stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 180 } 181 return value; 182 } 183 184 @Override 185 public void cacheBlock(BlockCacheKey cacheKey, Cacheable value, boolean inMemory) { 186 cacheBlock(cacheKey, value); 187 } 188 189 @Override 190 public void cacheBlock(BlockCacheKey key, Cacheable value) { 191 if (value.heapSize() > maxBlockSize) { 192 // If there are a lot of blocks that are too big this can make the logs too noisy (2% logged) 193 if (stats.failInsert() % 50 == 0) { 194 LOG.warn(String.format( 195 "Trying to cache too large a block %s @ %,d is %,d which is larger than %,d", 196 key.getHfileName(), key.getOffset(), value.heapSize(), DEFAULT_MAX_BLOCK_SIZE)); 197 } 198 } else { 199 cache.put(key, value); 200 } 201 } 202 203 @Override 204 public boolean evictBlock(BlockCacheKey cacheKey) { 205 Cacheable value = cache.asMap().remove(cacheKey); 206 return (value != null); 207 } 208 209 @Override 210 public int evictBlocksByHfileName(String hfileName) { 211 int evicted = 0; 212 for (BlockCacheKey key : cache.asMap().keySet()) { 213 if (key.getHfileName().equals(hfileName) && evictBlock(key)) { 214 evicted++; 215 } 216 } 217 if (victimCache != null) { 218 evicted += victimCache.evictBlocksByHfileName(hfileName); 219 } 220 return evicted; 221 } 222 223 @Override 224 public CacheStats getStats() { 225 return stats; 226 } 227 228 @Override 229 public void shutdown() { 230 if (victimCache != null) { 231 victimCache.shutdown(); 232 } 233 statsThreadPool.shutdown(); 234 } 235 236 @Override 237 public BlockCache[] getBlockCaches() { 238 return null; 239 } 240 241 @Override 242 public Iterator<CachedBlock> iterator() { 243 long now = System.nanoTime(); 244 return cache.asMap().entrySet().stream() 245 .map(entry -> (CachedBlock) new CachedBlockView(entry.getKey(), entry.getValue(), now)) 246 .iterator(); 247 } 248 249 private void logStats() { 250 LOG.info( 251 "totalSize=" + StringUtils.byteDesc(heapSize()) + ", " + 252 "freeSize=" + StringUtils.byteDesc(getFreeSize()) + ", " + 253 "max=" + StringUtils.byteDesc(size()) + ", " + 254 "blockCount=" + getBlockCount() + ", " + 255 "accesses=" + stats.getRequestCount() + ", " + 256 "hits=" + stats.getHitCount() + ", " + 257 "hitRatio=" + (stats.getHitCount() == 0 ? 258 "0," : StringUtils.formatPercent(stats.getHitRatio(), 2) + ", ") + 259 "cachingAccesses=" + stats.getRequestCachingCount() + ", " + 260 "cachingHits=" + stats.getHitCachingCount() + ", " + 261 "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ? 262 "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) + 263 "evictions=" + stats.getEvictionCount() + ", " + 264 "evicted=" + stats.getEvictedCount()); 265 } 266 267 @Override 268 public String toString() { 269 return MoreObjects.toStringHelper(this) 270 .add("blockCount", getBlockCount()) 271 .add("currentSize", getCurrentSize()) 272 .add("freeSize", getFreeSize()) 273 .add("maxSize", size()) 274 .add("heapSize", heapSize()) 275 .add("victimCache", (victimCache != null)) 276 .toString(); 277 } 278 279 /** A removal listener to asynchronously record evictions and populate the victim cache. */ 280 private final class EvictionListener implements RemovalListener<BlockCacheKey, Cacheable> { 281 282 @Override 283 public void onRemoval(BlockCacheKey key, Cacheable value, RemovalCause cause) { 284 if (!cause.wasEvicted()) { 285 // An explicit eviction (invalidation) is not added to the victim cache as the data may 286 // no longer be valid for subsequent queries. 287 return; 288 } 289 290 recordEviction(); 291 292 if (victimCache == null) { 293 return; 294 } else if (victimCache instanceof BucketCache) { 295 BucketCache victimBucketCache = (BucketCache) victimCache; 296 victimBucketCache.cacheBlockWithWait(key, value, /* inMemory */ true, /* wait */ true); 297 } else { 298 victimCache.cacheBlock(key, value); 299 } 300 } 301 } 302 303 /** 304 * Records an eviction. The number of eviction operations and evicted blocks are identical, as 305 * an eviction is triggered immediately when the capacity has been exceeded. An eviction is 306 * performed asynchronously. See the library's documentation for details on write buffers, 307 * batching, and maintenance behavior. 308 */ 309 private void recordEviction() { 310 // FIXME: Currently does not capture the insertion time 311 stats.evicted(Long.MAX_VALUE, true); 312 stats.evict(); 313 } 314 315 private static final class CachedBlockView implements CachedBlock { 316 private static final Comparator<CachedBlock> COMPARATOR = Comparator 317 .comparing(CachedBlock::getFilename) 318 .thenComparing(CachedBlock::getOffset) 319 .thenComparing(CachedBlock::getCachedTime); 320 321 private final BlockCacheKey key; 322 private final Cacheable value; 323 private final long now; 324 325 public CachedBlockView(BlockCacheKey key, Cacheable value, long now) { 326 this.now = now; 327 this.key = key; 328 this.value = value; 329 } 330 331 @Override 332 public BlockPriority getBlockPriority() { 333 // This does not appear to be used in any meaningful way and is irrelevant to this cache 334 return BlockPriority.MEMORY; 335 } 336 337 @Override 338 public BlockType getBlockType() { 339 return value.getBlockType(); 340 } 341 342 @Override 343 public long getOffset() { 344 return key.getOffset(); 345 } 346 347 @Override 348 public long getSize() { 349 return value.heapSize(); 350 } 351 352 @Override 353 public long getCachedTime() { 354 // This does not appear to be used in any meaningful way, so not captured 355 return 0L; 356 } 357 358 @Override 359 public String getFilename() { 360 return key.getHfileName(); 361 } 362 363 @Override 364 public int compareTo(CachedBlock other) { 365 return COMPARATOR.compare(this, other); 366 } 367 368 @Override 369 public boolean equals(Object obj) { 370 if (obj == this) { 371 return true; 372 } else if (!(obj instanceof CachedBlock)) { 373 return false; 374 } 375 CachedBlock other = (CachedBlock) obj; 376 return compareTo(other) == 0; 377 } 378 379 @Override 380 public int hashCode() { 381 return key.hashCode(); 382 } 383 384 @Override 385 public String toString() { 386 return BlockCacheUtil.toString(this, now); 387 } 388 } 389 390 @Override 391 public long getMaxSize() { 392 return size(); 393 } 394 395 @Override 396 public long getCurrentDataSize() { 397 return getCurrentSize(); 398 } 399 400 @Override 401 public long getDataBlockCount() { 402 return getBlockCount(); 403 } 404}