001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static java.util.Objects.requireNonNull;
021
022import java.util.Comparator;
023import java.util.Iterator;
024import java.util.concurrent.Executor;
025import java.util.concurrent.Executors;
026import java.util.concurrent.ScheduledExecutorService;
027import java.util.concurrent.TimeUnit;
028
029import com.github.benmanes.caffeine.cache.Cache;
030import com.github.benmanes.caffeine.cache.Caffeine;
031import com.github.benmanes.caffeine.cache.Policy.Eviction;
032import com.github.benmanes.caffeine.cache.RemovalCause;
033import com.github.benmanes.caffeine.cache.RemovalListener;
034
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.io.HeapSize;
037import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
038import org.apache.hadoop.util.StringUtils;
039import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
040import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
041import org.apache.yetus.audience.InterfaceAudience;
042
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * A block cache that is memory-aware using {@link HeapSize}, memory bounded using the W-TinyLFU
048 * eviction algorithm, and concurrent. This implementation delegates to a Caffeine cache to provide
049 * O(1) read and write operations.
050 * <ul>
051 *   <li>W-TinyLFU: http://arxiv.org/pdf/1512.00727.pdf</li>
052 *   <li>Caffeine: https://github.com/ben-manes/caffeine</li>
053 *   <li>Cache design: http://highscalability.com/blog/2016/1/25/design-of-a-modern-cache.html</li>
054 * </ul>
055 */
056@InterfaceAudience.Private
057public final class TinyLfuBlockCache implements FirstLevelBlockCache {
058  private static final Logger LOG = LoggerFactory.getLogger(TinyLfuBlockCache.class);
059
060  private static final String MAX_BLOCK_SIZE = "hbase.tinylfu.max.block.size";
061  private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
062  private static final int STAT_THREAD_PERIOD_SECONDS = 5 * 60;
063
064  private final Eviction<BlockCacheKey, Cacheable> policy;
065  private final ScheduledExecutorService statsThreadPool;
066  private final long maxBlockSize;
067  private final CacheStats stats;
068
069  private BlockCache victimCache;
070
071  final Cache<BlockCacheKey, Cacheable> cache;
072
073  /**
074   * Creates a block cache.
075   *
076   * @param maximumSizeInBytes maximum size of this cache, in bytes
077   * @param avgBlockSize expected average size of blocks, in bytes
078   * @param executor the cache's executor
079   * @param conf additional configuration
080   */
081  public TinyLfuBlockCache(long maximumSizeInBytes, long avgBlockSize,
082      Executor executor, Configuration conf) {
083    this(maximumSizeInBytes, avgBlockSize,
084        conf.getLong(MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE), executor);
085  }
086
087  /**
088   * Creates a block cache.
089   *
090   * @param maximumSizeInBytes maximum size of this cache, in bytes
091   * @param avgBlockSize expected average size of blocks, in bytes
092   * @param maxBlockSize maximum size of a block, in bytes
093   * @param executor the cache's executor
094   */
095  public TinyLfuBlockCache(long maximumSizeInBytes,
096      long avgBlockSize, long maxBlockSize, Executor executor) {
097    this.cache = Caffeine.newBuilder()
098        .executor(executor)
099        .maximumWeight(maximumSizeInBytes)
100        .removalListener(new EvictionListener())
101        .weigher((BlockCacheKey key, Cacheable value) ->
102            (int) Math.min(value.heapSize(), Integer.MAX_VALUE))
103        .initialCapacity((int) Math.ceil((1.2 * maximumSizeInBytes) / avgBlockSize))
104        .build();
105    this.maxBlockSize = maxBlockSize;
106    this.policy = cache.policy().eviction().get();
107    this.stats = new CacheStats(getClass().getSimpleName());
108
109    statsThreadPool = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
110        .setNameFormat("TinyLfuBlockCacheStatsExecutor").setDaemon(true).build());
111    statsThreadPool.scheduleAtFixedRate(this::logStats,
112        STAT_THREAD_PERIOD_SECONDS, STAT_THREAD_PERIOD_SECONDS, TimeUnit.SECONDS);
113  }
114
115  @Override
116  public void setVictimCache(BlockCache victimCache) {
117    if (this.victimCache != null) {
118      throw new IllegalArgumentException("The victim cache has already been set");
119    }
120    this.victimCache = requireNonNull(victimCache);
121  }
122
123  @Override
124  public long size() {
125    return policy.getMaximum();
126  }
127
128  @Override
129  public long getFreeSize() {
130    return size() - getCurrentSize();
131  }
132
133  @Override
134  public long getCurrentSize() {
135    return policy.weightedSize().getAsLong();
136  }
137
138  @Override
139  public long getBlockCount() {
140    return cache.estimatedSize();
141  }
142
143  @Override
144  public long heapSize() {
145    return getCurrentSize();
146  }
147
148  @Override
149  public void setMaxSize(long size) {
150    policy.setMaximum(size);
151  }
152
153  @Override
154  public boolean containsBlock(BlockCacheKey cacheKey) {
155    return cache.asMap().containsKey(cacheKey);
156  }
157
158  @Override
159  public Cacheable getBlock(BlockCacheKey cacheKey,
160      boolean caching, boolean repeat, boolean updateCacheMetrics) {
161    Cacheable value = cache.getIfPresent(cacheKey);
162    if (value == null) {
163      if (repeat) {
164        return null;
165      }
166      if (updateCacheMetrics) {
167        stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
168      }
169      if (victimCache != null) {
170        value = victimCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
171        if ((value != null) && caching) {
172          if ((value instanceof HFileBlock) && ((HFileBlock) value).isSharedMem()) {
173            value = HFileBlock.deepCloneOnHeap((HFileBlock) value);
174          }
175          cacheBlock(cacheKey, value);
176        }
177      }
178    } else if (updateCacheMetrics) {
179      stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
180    }
181    return value;
182  }
183
184  @Override
185  public void cacheBlock(BlockCacheKey cacheKey, Cacheable value, boolean inMemory) {
186    cacheBlock(cacheKey, value);
187  }
188
189  @Override
190  public void cacheBlock(BlockCacheKey key, Cacheable value) {
191    if (value.heapSize() > maxBlockSize) {
192      // If there are a lot of blocks that are too big this can make the logs too noisy (2% logged)
193      if (stats.failInsert() % 50 == 0) {
194        LOG.warn(String.format(
195            "Trying to cache too large a block %s @ %,d is %,d which is larger than %,d",
196            key.getHfileName(), key.getOffset(), value.heapSize(), DEFAULT_MAX_BLOCK_SIZE));
197      }
198    } else {
199      cache.put(key, value);
200    }
201  }
202
203  @Override
204  public boolean evictBlock(BlockCacheKey cacheKey) {
205    Cacheable value = cache.asMap().remove(cacheKey);
206    return (value != null);
207  }
208
209  @Override
210  public int evictBlocksByHfileName(String hfileName) {
211    int evicted = 0;
212    for (BlockCacheKey key : cache.asMap().keySet()) {
213      if (key.getHfileName().equals(hfileName) && evictBlock(key)) {
214        evicted++;
215      }
216    }
217    if (victimCache != null) {
218      evicted += victimCache.evictBlocksByHfileName(hfileName);
219    }
220    return evicted;
221  }
222
223  @Override
224  public CacheStats getStats() {
225    return stats;
226  }
227
228  @Override
229  public void shutdown() {
230    if (victimCache != null) {
231      victimCache.shutdown();
232    }
233    statsThreadPool.shutdown();
234  }
235
236  @Override
237  public BlockCache[] getBlockCaches() {
238    return null;
239  }
240
241  @Override
242  public Iterator<CachedBlock> iterator() {
243    long now = System.nanoTime();
244    return cache.asMap().entrySet().stream()
245        .map(entry -> (CachedBlock) new CachedBlockView(entry.getKey(), entry.getValue(), now))
246        .iterator();
247  }
248
249  private void logStats() {
250    LOG.info(
251        "totalSize=" + StringUtils.byteDesc(heapSize()) + ", " +
252        "freeSize=" + StringUtils.byteDesc(getFreeSize()) + ", " +
253        "max=" + StringUtils.byteDesc(size()) + ", " +
254        "blockCount=" + getBlockCount() + ", " +
255        "accesses=" + stats.getRequestCount() + ", " +
256        "hits=" + stats.getHitCount() + ", " +
257        "hitRatio=" + (stats.getHitCount() == 0 ?
258          "0," : StringUtils.formatPercent(stats.getHitRatio(), 2) + ", ") +
259        "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
260        "cachingHits=" + stats.getHitCachingCount() + ", " +
261        "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ?
262          "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
263        "evictions=" + stats.getEvictionCount() + ", " +
264        "evicted=" + stats.getEvictedCount());
265  }
266
267  @Override
268  public String toString() {
269    return MoreObjects.toStringHelper(this)
270      .add("blockCount", getBlockCount())
271      .add("currentSize", getCurrentSize())
272      .add("freeSize", getFreeSize())
273      .add("maxSize", size())
274      .add("heapSize", heapSize())
275      .add("victimCache", (victimCache != null))
276      .toString();
277  }
278
279  /** A removal listener to asynchronously record evictions and populate the victim cache. */
280  private final class EvictionListener implements RemovalListener<BlockCacheKey, Cacheable> {
281
282    @Override
283    public void onRemoval(BlockCacheKey key, Cacheable value, RemovalCause cause) {
284      if (!cause.wasEvicted()) {
285        // An explicit eviction (invalidation) is not added to the victim cache as the data may
286        // no longer be valid for subsequent queries.
287        return;
288      }
289
290      recordEviction();
291
292      if (victimCache == null) {
293        return;
294      } else if (victimCache instanceof BucketCache) {
295        BucketCache victimBucketCache = (BucketCache) victimCache;
296        victimBucketCache.cacheBlockWithWait(key, value, /* inMemory */ true, /* wait */ true);
297      } else {
298        victimCache.cacheBlock(key, value);
299      }
300    }
301  }
302
303  /**
304   * Records an eviction. The number of eviction operations and evicted blocks are identical, as
305   * an eviction is triggered immediately when the capacity has been exceeded. An eviction is
306   * performed asynchronously. See the library's documentation for details on write buffers,
307   * batching, and maintenance behavior.
308   */
309  private void recordEviction() {
310    // FIXME: Currently does not capture the insertion time
311    stats.evicted(Long.MAX_VALUE, true);
312    stats.evict();
313  }
314
315  private static final class CachedBlockView implements CachedBlock {
316    private static final Comparator<CachedBlock> COMPARATOR = Comparator
317        .comparing(CachedBlock::getFilename)
318        .thenComparing(CachedBlock::getOffset)
319        .thenComparing(CachedBlock::getCachedTime);
320
321    private final BlockCacheKey key;
322    private final Cacheable value;
323    private final long now;
324
325    public CachedBlockView(BlockCacheKey key, Cacheable value, long now) {
326      this.now = now;
327      this.key = key;
328      this.value = value;
329    }
330
331    @Override
332    public BlockPriority getBlockPriority() {
333      // This does not appear to be used in any meaningful way and is irrelevant to this cache
334      return BlockPriority.MEMORY;
335    }
336
337    @Override
338    public BlockType getBlockType() {
339      return value.getBlockType();
340    }
341
342    @Override
343    public long getOffset() {
344      return key.getOffset();
345    }
346
347    @Override
348    public long getSize() {
349      return value.heapSize();
350    }
351
352    @Override
353    public long getCachedTime() {
354      // This does not appear to be used in any meaningful way, so not captured
355      return 0L;
356    }
357
358    @Override
359    public String getFilename() {
360      return key.getHfileName();
361    }
362
363    @Override
364    public int compareTo(CachedBlock other) {
365      return COMPARATOR.compare(this, other);
366    }
367
368    @Override
369    public boolean equals(Object obj) {
370      if (obj == this) {
371        return true;
372      } else if (!(obj instanceof CachedBlock)) {
373        return false;
374      }
375      CachedBlock other = (CachedBlock) obj;
376      return compareTo(other) == 0;
377    }
378
379    @Override
380    public int hashCode() {
381      return key.hashCode();
382    }
383
384    @Override
385    public String toString() {
386      return BlockCacheUtil.toString(this, now);
387    }
388  }
389
390  @Override
391  public long getMaxSize() {
392    return size();
393  }
394
395  @Override
396  public long getCurrentDataSize() {
397    return getCurrentSize();
398  }
399
400  @Override
401  public long getDataBlockCount() {
402    return getBlockCount();
403  }
404}