001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static java.util.Objects.requireNonNull;
021
022import java.util.Comparator;
023import java.util.Iterator;
024import java.util.concurrent.Executor;
025import java.util.concurrent.Executors;
026import java.util.concurrent.ScheduledExecutorService;
027import java.util.concurrent.TimeUnit;
028
029import com.github.benmanes.caffeine.cache.Cache;
030import com.github.benmanes.caffeine.cache.Caffeine;
031import com.github.benmanes.caffeine.cache.Policy.Eviction;
032import com.github.benmanes.caffeine.cache.RemovalCause;
033import com.github.benmanes.caffeine.cache.RemovalListener;
034
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.io.HeapSize;
037import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
038import org.apache.hadoop.util.StringUtils;
039import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
040import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
041import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
042import org.apache.yetus.audience.InterfaceAudience;
043
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047/**
048 * A block cache that is memory-aware using {@link HeapSize}, memory bounded using the W-TinyLFU
049 * eviction algorithm, and concurrent. This implementation delegates to a Caffeine cache to provide
050 * O(1) read and write operations.
051 * <ul>
052 *   <li>W-TinyLFU: http://arxiv.org/pdf/1512.00727.pdf</li>
053 *   <li>Caffeine: https://github.com/ben-manes/caffeine</li>
054 *   <li>Cache design: http://highscalability.com/blog/2016/1/25/design-of-a-modern-cache.html</li>
055 * </ul>
056 */
057@InterfaceAudience.Private
058public final class TinyLfuBlockCache implements FirstLevelBlockCache {
059  private static final Logger LOG = LoggerFactory.getLogger(TinyLfuBlockCache.class);
060
061  private static final String MAX_BLOCK_SIZE = "hbase.tinylfu.max.block.size";
062  private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
063  private static final int STAT_THREAD_PERIOD_SECONDS = 5 * 60;
064
065  private final Eviction<BlockCacheKey, Cacheable> policy;
066  private final ScheduledExecutorService statsThreadPool;
067  private final long maxBlockSize;
068  private final CacheStats stats;
069
070  private BlockCache victimCache;
071
072  @VisibleForTesting
073  final Cache<BlockCacheKey, Cacheable> cache;
074
075  /**
076   * Creates a block cache.
077   *
078   * @param maximumSizeInBytes maximum size of this cache, in bytes
079   * @param avgBlockSize expected average size of blocks, in bytes
080   * @param executor the cache's executor
081   * @param conf additional configuration
082   */
083  public TinyLfuBlockCache(long maximumSizeInBytes, long avgBlockSize,
084      Executor executor, Configuration conf) {
085    this(maximumSizeInBytes, avgBlockSize,
086        conf.getLong(MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE), executor);
087  }
088
089  /**
090   * Creates a block cache.
091   *
092   * @param maximumSizeInBytes maximum size of this cache, in bytes
093   * @param avgBlockSize expected average size of blocks, in bytes
094   * @param maxBlockSize maximum size of a block, in bytes
095   * @param executor the cache's executor
096   */
097  public TinyLfuBlockCache(long maximumSizeInBytes,
098      long avgBlockSize, long maxBlockSize, Executor executor) {
099    this.cache = Caffeine.newBuilder()
100        .executor(executor)
101        .maximumWeight(maximumSizeInBytes)
102        .removalListener(new EvictionListener())
103        .weigher((BlockCacheKey key, Cacheable value) ->
104            (int) Math.min(value.heapSize(), Integer.MAX_VALUE))
105        .initialCapacity((int) Math.ceil((1.2 * maximumSizeInBytes) / avgBlockSize))
106        .build();
107    this.maxBlockSize = maxBlockSize;
108    this.policy = cache.policy().eviction().get();
109    this.stats = new CacheStats(getClass().getSimpleName());
110
111    statsThreadPool = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
112        .setNameFormat("TinyLfuBlockCacheStatsExecutor").setDaemon(true).build());
113    statsThreadPool.scheduleAtFixedRate(this::logStats,
114        STAT_THREAD_PERIOD_SECONDS, STAT_THREAD_PERIOD_SECONDS, TimeUnit.SECONDS);
115  }
116
117  @Override
118  public void setVictimCache(BlockCache victimCache) {
119    if (this.victimCache != null) {
120      throw new IllegalArgumentException("The victim cache has already been set");
121    }
122    this.victimCache = requireNonNull(victimCache);
123  }
124
125  @Override
126  public long size() {
127    return policy.getMaximum();
128  }
129
130  @Override
131  public long getFreeSize() {
132    return size() - getCurrentSize();
133  }
134
135  @Override
136  public long getCurrentSize() {
137    return policy.weightedSize().getAsLong();
138  }
139
140  @Override
141  public long getBlockCount() {
142    return cache.estimatedSize();
143  }
144
145  @Override
146  public long heapSize() {
147    return getCurrentSize();
148  }
149
150  @Override
151  public void setMaxSize(long size) {
152    policy.setMaximum(size);
153  }
154
155  @Override
156  public boolean containsBlock(BlockCacheKey cacheKey) {
157    return cache.asMap().containsKey(cacheKey);
158  }
159
160  @Override
161  public Cacheable getBlock(BlockCacheKey cacheKey,
162      boolean caching, boolean repeat, boolean updateCacheMetrics) {
163    Cacheable value = cache.getIfPresent(cacheKey);
164    if (value == null) {
165      if (repeat) {
166        return null;
167      }
168      if (updateCacheMetrics) {
169        stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
170      }
171      if (victimCache != null) {
172        value = victimCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
173        if ((value != null) && caching) {
174          if ((value instanceof HFileBlock) && ((HFileBlock) value).isSharedMem()) {
175            value = HFileBlock.deepCloneOnHeap((HFileBlock) value);
176          }
177          cacheBlock(cacheKey, value);
178        }
179      }
180    } else if (updateCacheMetrics) {
181      stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
182    }
183    return value;
184  }
185
186  @Override
187  public void cacheBlock(BlockCacheKey cacheKey, Cacheable value, boolean inMemory) {
188    cacheBlock(cacheKey, value);
189  }
190
191  @Override
192  public void cacheBlock(BlockCacheKey key, Cacheable value) {
193    if (value.heapSize() > maxBlockSize) {
194      // If there are a lot of blocks that are too big this can make the logs too noisy (2% logged)
195      if (stats.failInsert() % 50 == 0) {
196        LOG.warn(String.format(
197            "Trying to cache too large a block %s @ %,d is %,d which is larger than %,d",
198            key.getHfileName(), key.getOffset(), value.heapSize(), DEFAULT_MAX_BLOCK_SIZE));
199      }
200    } else {
201      cache.put(key, value);
202    }
203  }
204
205  @Override
206  public boolean evictBlock(BlockCacheKey cacheKey) {
207    Cacheable value = cache.asMap().remove(cacheKey);
208    return (value != null);
209  }
210
211  @Override
212  public int evictBlocksByHfileName(String hfileName) {
213    int evicted = 0;
214    for (BlockCacheKey key : cache.asMap().keySet()) {
215      if (key.getHfileName().equals(hfileName) && evictBlock(key)) {
216        evicted++;
217      }
218    }
219    if (victimCache != null) {
220      evicted += victimCache.evictBlocksByHfileName(hfileName);
221    }
222    return evicted;
223  }
224
225  @Override
226  public CacheStats getStats() {
227    return stats;
228  }
229
230  @Override
231  public void shutdown() {
232    if (victimCache != null) {
233      victimCache.shutdown();
234    }
235    statsThreadPool.shutdown();
236  }
237
238  @Override
239  public BlockCache[] getBlockCaches() {
240    return null;
241  }
242
243  @Override
244  public Iterator<CachedBlock> iterator() {
245    long now = System.nanoTime();
246    return cache.asMap().entrySet().stream()
247        .map(entry -> (CachedBlock) new CachedBlockView(entry.getKey(), entry.getValue(), now))
248        .iterator();
249  }
250
251  private void logStats() {
252    LOG.info(
253        "totalSize=" + StringUtils.byteDesc(heapSize()) + ", " +
254        "freeSize=" + StringUtils.byteDesc(getFreeSize()) + ", " +
255        "max=" + StringUtils.byteDesc(size()) + ", " +
256        "blockCount=" + getBlockCount() + ", " +
257        "accesses=" + stats.getRequestCount() + ", " +
258        "hits=" + stats.getHitCount() + ", " +
259        "hitRatio=" + (stats.getHitCount() == 0 ?
260          "0," : StringUtils.formatPercent(stats.getHitRatio(), 2) + ", ") +
261        "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
262        "cachingHits=" + stats.getHitCachingCount() + ", " +
263        "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ?
264          "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
265        "evictions=" + stats.getEvictionCount() + ", " +
266        "evicted=" + stats.getEvictedCount());
267  }
268
269  @Override
270  public String toString() {
271    return MoreObjects.toStringHelper(this)
272      .add("blockCount", getBlockCount())
273      .add("currentSize", getCurrentSize())
274      .add("freeSize", getFreeSize())
275      .add("maxSize", size())
276      .add("heapSize", heapSize())
277      .add("victimCache", (victimCache != null))
278      .toString();
279  }
280
281  /** A removal listener to asynchronously record evictions and populate the victim cache. */
282  private final class EvictionListener implements RemovalListener<BlockCacheKey, Cacheable> {
283
284    @Override
285    public void onRemoval(BlockCacheKey key, Cacheable value, RemovalCause cause) {
286      if (!cause.wasEvicted()) {
287        // An explicit eviction (invalidation) is not added to the victim cache as the data may
288        // no longer be valid for subsequent queries.
289        return;
290      }
291
292      recordEviction();
293
294      if (victimCache == null) {
295        return;
296      } else if (victimCache instanceof BucketCache) {
297        BucketCache victimBucketCache = (BucketCache) victimCache;
298        victimBucketCache.cacheBlockWithWait(key, value, /* inMemory */ true, /* wait */ true);
299      } else {
300        victimCache.cacheBlock(key, value);
301      }
302    }
303  }
304
305  /**
306   * Records an eviction. The number of eviction operations and evicted blocks are identical, as
307   * an eviction is triggered immediately when the capacity has been exceeded. An eviction is
308   * performed asynchronously. See the library's documentation for details on write buffers,
309   * batching, and maintenance behavior.
310   */
311  private void recordEviction() {
312    // FIXME: Currently does not capture the insertion time
313    stats.evicted(Long.MAX_VALUE, true);
314    stats.evict();
315  }
316
317  private static final class CachedBlockView implements CachedBlock {
318    private static final Comparator<CachedBlock> COMPARATOR = Comparator
319        .comparing(CachedBlock::getFilename)
320        .thenComparing(CachedBlock::getOffset)
321        .thenComparing(CachedBlock::getCachedTime);
322
323    private final BlockCacheKey key;
324    private final Cacheable value;
325    private final long now;
326
327    public CachedBlockView(BlockCacheKey key, Cacheable value, long now) {
328      this.now = now;
329      this.key = key;
330      this.value = value;
331    }
332
333    @Override
334    public BlockPriority getBlockPriority() {
335      // This does not appear to be used in any meaningful way and is irrelevant to this cache
336      return BlockPriority.MEMORY;
337    }
338
339    @Override
340    public BlockType getBlockType() {
341      return value.getBlockType();
342    }
343
344    @Override
345    public long getOffset() {
346      return key.getOffset();
347    }
348
349    @Override
350    public long getSize() {
351      return value.heapSize();
352    }
353
354    @Override
355    public long getCachedTime() {
356      // This does not appear to be used in any meaningful way, so not captured
357      return 0L;
358    }
359
360    @Override
361    public String getFilename() {
362      return key.getHfileName();
363    }
364
365    @Override
366    public int compareTo(CachedBlock other) {
367      return COMPARATOR.compare(this, other);
368    }
369
370    @Override
371    public boolean equals(Object obj) {
372      if (obj == this) {
373        return true;
374      } else if (!(obj instanceof CachedBlock)) {
375        return false;
376      }
377      CachedBlock other = (CachedBlock) obj;
378      return compareTo(other) == 0;
379    }
380
381    @Override
382    public int hashCode() {
383      return key.hashCode();
384    }
385
386    @Override
387    public String toString() {
388      return BlockCacheUtil.toString(this, now);
389    }
390  }
391
392  @Override
393  public long getMaxSize() {
394    return size();
395  }
396
397  @Override
398  public long getCurrentDataSize() {
399    return getCurrentSize();
400  }
401
402  @Override
403  public long getDataBlockCount() {
404    return getBlockCount();
405  }
406}