001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static java.util.Objects.requireNonNull;
021
022import com.github.benmanes.caffeine.cache.Cache;
023import com.github.benmanes.caffeine.cache.Caffeine;
024import com.github.benmanes.caffeine.cache.Policy.Eviction;
025import com.github.benmanes.caffeine.cache.RemovalCause;
026import com.github.benmanes.caffeine.cache.RemovalListener;
027import java.util.Comparator;
028import java.util.Iterator;
029import java.util.concurrent.Executor;
030import java.util.concurrent.Executors;
031import java.util.concurrent.ScheduledExecutorService;
032import java.util.concurrent.TimeUnit;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.io.HeapSize;
035import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
036import org.apache.hadoop.util.StringUtils;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
042import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
043
044/**
045 * A block cache that is memory-aware using {@link HeapSize}, memory bounded using the W-TinyLFU
046 * eviction algorithm, and concurrent. This implementation delegates to a Caffeine cache to provide
047 * O(1) read and write operations.
048 * <ul>
049 * <li>W-TinyLFU: http://arxiv.org/pdf/1512.00727.pdf</li>
050 * <li>Caffeine: https://github.com/ben-manes/caffeine</li>
051 * <li>Cache design: http://highscalability.com/blog/2016/1/25/design-of-a-modern-cache.html</li>
052 * </ul>
053 */
054@InterfaceAudience.Private
055public final class TinyLfuBlockCache implements FirstLevelBlockCache {
056  private static final Logger LOG = LoggerFactory.getLogger(TinyLfuBlockCache.class);
057
058  private static final String MAX_BLOCK_SIZE = "hbase.tinylfu.max.block.size";
059  private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
060  private static final int STAT_THREAD_PERIOD_SECONDS = 5 * 60;
061
062  private transient final Eviction<BlockCacheKey, Cacheable> policy;
063  private transient final ScheduledExecutorService statsThreadPool;
064  private final long maxBlockSize;
065  private final CacheStats stats;
066
067  private transient BlockCache victimCache;
068
069  transient final Cache<BlockCacheKey, Cacheable> cache;
070
071  /**
072   * Creates a block cache.
073   * @param maximumSizeInBytes maximum size of this cache, in bytes
074   * @param avgBlockSize       expected average size of blocks, in bytes
075   * @param executor           the cache's executor
076   * @param conf               additional configuration
077   */
078  public TinyLfuBlockCache(long maximumSizeInBytes, long avgBlockSize, Executor executor,
079    Configuration conf) {
080    this(maximumSizeInBytes, avgBlockSize, conf.getLong(MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE),
081      executor);
082  }
083
084  /**
085   * Creates a block cache.
086   * @param maximumSizeInBytes maximum size of this cache, in bytes
087   * @param avgBlockSize       expected average size of blocks, in bytes
088   * @param maxBlockSize       maximum size of a block, in bytes
089   * @param executor           the cache's executor
090   */
091  public TinyLfuBlockCache(long maximumSizeInBytes, long avgBlockSize, long maxBlockSize,
092    Executor executor) {
093    this.cache = Caffeine.newBuilder().executor(executor).maximumWeight(maximumSizeInBytes)
094      .removalListener(new EvictionListener())
095      .weigher(
096        (BlockCacheKey key, Cacheable value) -> (int) Math.min(value.heapSize(), Integer.MAX_VALUE))
097      .initialCapacity((int) Math.ceil((1.2 * maximumSizeInBytes) / avgBlockSize)).build();
098    this.maxBlockSize = maxBlockSize;
099    this.policy = cache.policy().eviction().get();
100    this.stats = new CacheStats(getClass().getSimpleName());
101
102    statsThreadPool = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
103      .setNameFormat("TinyLfuBlockCacheStatsExecutor").setDaemon(true).build());
104    statsThreadPool.scheduleAtFixedRate(this::logStats, STAT_THREAD_PERIOD_SECONDS,
105      STAT_THREAD_PERIOD_SECONDS, TimeUnit.SECONDS);
106  }
107
108  @Override
109  public void setVictimCache(BlockCache victimCache) {
110    if (this.victimCache != null) {
111      throw new IllegalArgumentException("The victim cache has already been set");
112    }
113    this.victimCache = requireNonNull(victimCache);
114  }
115
116  @Override
117  public long size() {
118    return policy.getMaximum();
119  }
120
121  @Override
122  public long getFreeSize() {
123    return size() - getCurrentSize();
124  }
125
126  @Override
127  public long getCurrentSize() {
128    return policy.weightedSize().getAsLong();
129  }
130
131  @Override
132  public long getBlockCount() {
133    return cache.estimatedSize();
134  }
135
136  @Override
137  public long heapSize() {
138    return getCurrentSize();
139  }
140
141  @Override
142  public void setMaxSize(long size) {
143    policy.setMaximum(size);
144  }
145
146  @Override
147  public boolean containsBlock(BlockCacheKey cacheKey) {
148    return cache.asMap().containsKey(cacheKey);
149  }
150
151  @Override
152  public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
153    boolean updateCacheMetrics) {
154    Cacheable value = cache.asMap().computeIfPresent(cacheKey, (blockCacheKey, cacheable) -> {
155      // It will be referenced by RPC path, so increase here. NOTICE: Must do the retain inside
156      // this block. because if retain outside the map#computeIfPresent, the evictBlock may remove
157      // the block and release, then we're retaining a block with refCnt=0 which is disallowed.
158      cacheable.retain();
159      return cacheable;
160    });
161    if (value == null) {
162      if (repeat) {
163        return null;
164      }
165      if (updateCacheMetrics) {
166        stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
167      }
168      if (victimCache != null) {
169        value = victimCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
170        if ((value != null) && caching) {
171          cacheBlock(cacheKey, value);
172        }
173      }
174    } else if (updateCacheMetrics) {
175      stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
176    }
177    return value;
178  }
179
180  @Override
181  public void cacheBlock(BlockCacheKey cacheKey, Cacheable value, boolean inMemory) {
182    cacheBlock(cacheKey, value);
183  }
184
185  @Override
186  public void cacheBlock(BlockCacheKey key, Cacheable value) {
187    if (value.heapSize() > maxBlockSize) {
188      // If there are a lot of blocks that are too big this can make the logs too noisy (2% logged)
189      if (stats.failInsert() % 50 == 0) {
190        LOG.warn(String.format(
191          "Trying to cache too large a block %s @ %,d is %,d which is larger than %,d",
192          key.getHfileName(), key.getOffset(), value.heapSize(), DEFAULT_MAX_BLOCK_SIZE));
193      }
194    } else {
195      value = asReferencedHeapBlock(value);
196      cache.put(key, value);
197    }
198  }
199
200  /**
201   * The block cached in TinyLfuBlockCache will always be an heap block: on the one side, the heap
202   * access will be more faster then off-heap, the small index block or meta block cached in
203   * CombinedBlockCache will benefit a lot. on other side, the TinyLfuBlockCache size is always
204   * calculated based on the total heap size, if caching an off-heap block in TinyLfuBlockCache, the
205   * heap size will be messed up. Here we will clone the block into an heap block if it's an
206   * off-heap block, otherwise just use the original block. The key point is maintain the refCnt of
207   * the block (HBASE-22127): <br>
208   * 1. if cache the cloned heap block, its refCnt is an totally new one, it's easy to handle; <br>
209   * 2. if cache the original heap block, we're sure that it won't be tracked in ByteBuffAllocator's
210   * reservoir, if both RPC and TinyLfuBlockCache release the block, then it can be garbage
211   * collected by JVM, so need a retain here.
212   * @param buf the original block
213   * @return an block with an heap memory backend.
214   */
215  private Cacheable asReferencedHeapBlock(Cacheable buf) {
216    if (buf instanceof HFileBlock) {
217      HFileBlock blk = ((HFileBlock) buf);
218      if (blk.isSharedMem()) {
219        return HFileBlock.deepCloneOnHeap(blk);
220      }
221    }
222    // The block will be referenced by this TinyLfuBlockCache, so should increase its refCnt here.
223    return buf.retain();
224  }
225
226  @Override
227  public boolean evictBlock(BlockCacheKey cacheKey) {
228    Cacheable value = cache.asMap().remove(cacheKey);
229    if (value != null) {
230      value.release();
231    }
232    return (value != null);
233  }
234
235  @Override
236  public int evictBlocksByHfileName(String hfileName) {
237    int evicted = 0;
238    for (BlockCacheKey key : cache.asMap().keySet()) {
239      if (key.getHfileName().equals(hfileName) && evictBlock(key)) {
240        evicted++;
241      }
242    }
243    if (victimCache != null) {
244      evicted += victimCache.evictBlocksByHfileName(hfileName);
245    }
246    return evicted;
247  }
248
249  @Override
250  public CacheStats getStats() {
251    return stats;
252  }
253
254  @Override
255  public void shutdown() {
256    if (victimCache != null) {
257      victimCache.shutdown();
258    }
259    statsThreadPool.shutdown();
260  }
261
262  @Override
263  public BlockCache[] getBlockCaches() {
264    return null;
265  }
266
267  @Override
268  public Iterator<CachedBlock> iterator() {
269    long now = System.nanoTime();
270    return cache.asMap().entrySet().stream()
271      .map(entry -> (CachedBlock) new CachedBlockView(entry.getKey(), entry.getValue(), now))
272      .iterator();
273  }
274
275  private void logStats() {
276    LOG.info("totalSize=" + StringUtils.byteDesc(heapSize()) + ", " + "freeSize="
277      + StringUtils.byteDesc(getFreeSize()) + ", " + "max=" + StringUtils.byteDesc(size()) + ", "
278      + "blockCount=" + getBlockCount() + ", " + "accesses=" + stats.getRequestCount() + ", "
279      + "hits=" + stats.getHitCount() + ", " + "hitRatio="
280      + (stats.getHitCount() == 0 ? "0," : StringUtils.formatPercent(stats.getHitRatio(), 2) + ", ")
281      + "cachingAccesses=" + stats.getRequestCachingCount() + ", " + "cachingHits="
282      + stats.getHitCachingCount() + ", " + "cachingHitsRatio="
283      + (stats.getHitCachingCount() == 0
284        ? "0,"
285        : (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", "))
286      + "evictions=" + stats.getEvictionCount() + ", " + "evicted=" + stats.getEvictedCount());
287  }
288
289  @Override
290  public String toString() {
291    return MoreObjects.toStringHelper(this).add("blockCount", getBlockCount())
292      .add("currentSize", getCurrentSize()).add("freeSize", getFreeSize()).add("maxSize", size())
293      .add("heapSize", heapSize()).add("victimCache", (victimCache != null)).toString();
294  }
295
296  /** A removal listener to asynchronously record evictions and populate the victim cache. */
297  private final class EvictionListener implements RemovalListener<BlockCacheKey, Cacheable> {
298
299    @Override
300    public void onRemoval(BlockCacheKey key, Cacheable value, RemovalCause cause) {
301      if (!cause.wasEvicted()) {
302        // An explicit eviction (invalidation) is not added to the victim cache as the data may
303        // no longer be valid for subsequent queries.
304        return;
305      }
306
307      recordEviction();
308
309      if (victimCache == null) {
310        return;
311      } else if (victimCache instanceof BucketCache) {
312        BucketCache victimBucketCache = (BucketCache) victimCache;
313        victimBucketCache.cacheBlockWithWait(key, value, /* inMemory */ true, /* wait */ true);
314      } else {
315        victimCache.cacheBlock(key, value);
316      }
317    }
318  }
319
320  /**
321   * Records an eviction. The number of eviction operations and evicted blocks are identical, as an
322   * eviction is triggered immediately when the capacity has been exceeded. An eviction is performed
323   * asynchronously. See the library's documentation for details on write buffers, batching, and
324   * maintenance behavior.
325   */
326  private void recordEviction() {
327    // FIXME: Currently does not capture the insertion time
328    stats.evicted(Long.MAX_VALUE, true);
329    stats.evict();
330  }
331
332  private static final class CachedBlockView implements CachedBlock {
333    private static final Comparator<CachedBlock> COMPARATOR =
334      Comparator.comparing(CachedBlock::getFilename).thenComparing(CachedBlock::getOffset)
335        .thenComparing(CachedBlock::getCachedTime);
336
337    private final BlockCacheKey key;
338    private final Cacheable value;
339    private final long now;
340
341    public CachedBlockView(BlockCacheKey key, Cacheable value, long now) {
342      this.now = now;
343      this.key = key;
344      this.value = value;
345    }
346
347    @Override
348    public BlockPriority getBlockPriority() {
349      // This does not appear to be used in any meaningful way and is irrelevant to this cache
350      return BlockPriority.MEMORY;
351    }
352
353    @Override
354    public BlockType getBlockType() {
355      return value.getBlockType();
356    }
357
358    @Override
359    public long getOffset() {
360      return key.getOffset();
361    }
362
363    @Override
364    public long getSize() {
365      return value.heapSize();
366    }
367
368    @Override
369    public long getCachedTime() {
370      // This does not appear to be used in any meaningful way, so not captured
371      return 0L;
372    }
373
374    @Override
375    public String getFilename() {
376      return key.getHfileName();
377    }
378
379    @Override
380    public int compareTo(CachedBlock other) {
381      return COMPARATOR.compare(this, other);
382    }
383
384    @Override
385    public boolean equals(Object obj) {
386      if (obj == this) {
387        return true;
388      } else if (!(obj instanceof CachedBlock)) {
389        return false;
390      }
391      CachedBlock other = (CachedBlock) obj;
392      return compareTo(other) == 0;
393    }
394
395    @Override
396    public int hashCode() {
397      return key.hashCode();
398    }
399
400    @Override
401    public String toString() {
402      return BlockCacheUtil.toString(this, now);
403    }
404  }
405
406  @Override
407  public long getMaxSize() {
408    return size();
409  }
410
411  @Override
412  public long getCurrentDataSize() {
413    return getCurrentSize();
414  }
415
416  @Override
417  public long getDataBlockCount() {
418    return getBlockCount();
419  }
420}