View Javadoc

1   /**
2    * Copyright 2011 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.io.hfile.slab;
21  
22  import java.nio.ByteBuffer;
23  import java.util.List;
24  import java.util.concurrent.ConcurrentMap;
25  import java.util.concurrent.atomic.AtomicLong;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.io.HeapSize;
31  import org.apache.hadoop.hbase.io.hfile.BlockCache;
32  import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
33  import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
34  import org.apache.hadoop.hbase.io.hfile.CacheStats;
35  import org.apache.hadoop.hbase.io.hfile.Cacheable;
36  import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.apache.hadoop.hbase.util.ClassSize;
39  import org.apache.hadoop.util.StringUtils;
40  
41  import com.google.common.cache.CacheBuilder;
42  import com.google.common.cache.RemovalListener;
43  import com.google.common.cache.RemovalNotification;
44  
45  /**
46   * SingleSizeCache is a slab allocated cache that caches elements up to a single
47   * size. It uses a slab allocator (Slab.java) to divide a direct bytebuffer,
48   * into evenly sized blocks. Any cached data will take up exactly 1 block. An
49   * exception will be thrown if the cached data cannot fit into the blockSize of
50   * this SingleSizeCache.
51   *
52   * Eviction and LRUness is taken care of by Guava's MapMaker, which creates a
53   * ConcurrentLinkedHashMap.
54   *
55   **/
56  public class SingleSizeCache implements BlockCache, HeapSize {
57    private final Slab backingStore;
58    private final ConcurrentMap<BlockCacheKey, CacheablePair> backingMap;
59    private final int numBlocks;
60    private final int blockSize;
61    private final CacheStats stats;
62    private final SlabItemActionWatcher actionWatcher;
63    private final AtomicLong size;
64    private final AtomicLong timeSinceLastAccess;
65    public final static long CACHE_FIXED_OVERHEAD = ClassSize
66        .align((2 * Bytes.SIZEOF_INT) + (5 * ClassSize.REFERENCE)
67            + +ClassSize.OBJECT);
68  
69    static final Log LOG = LogFactory.getLog(SingleSizeCache.class);
70  
71    /**
72     * Default constructor. Specify the size of the blocks, number of blocks, and
73     * the SlabCache this cache will be assigned to.
74     *
75     *
76     * @param blockSize the size of each block, in bytes
77     *
78     * @param numBlocks the number of blocks of blockSize this cache will hold.
79     *
80     * @param master the SlabCache this SingleSlabCache is assigned to.
81     */
82    public SingleSizeCache(int blockSize, int numBlocks,
83        SlabItemActionWatcher master) {
84      this.blockSize = blockSize;
85      this.numBlocks = numBlocks;
86      backingStore = new Slab(blockSize, numBlocks);
87      this.stats = new CacheStats();
88      this.actionWatcher = master;
89      this.size = new AtomicLong(CACHE_FIXED_OVERHEAD + backingStore.heapSize());
90      this.timeSinceLastAccess = new AtomicLong();
91  
92      // This evictionListener is called whenever the cache automatically
93      // evicts
94      // something.
95      RemovalListener<BlockCacheKey, CacheablePair> listener =
96        new RemovalListener<BlockCacheKey, CacheablePair>() {
97          @Override
98          public void onRemoval(
99              RemovalNotification<BlockCacheKey, CacheablePair> notification) {
100           if (!notification.wasEvicted()) {
101             // Only process removals by eviction, not by replacement or
102             // explicit removal
103             return;
104           }
105           CacheablePair value = notification.getValue();
106           timeSinceLastAccess.set(System.nanoTime()
107               - value.recentlyAccessed.get());
108           stats.evict();
109           doEviction(notification.getKey(), value);
110         }
111       };
112 
113     backingMap = CacheBuilder.newBuilder()
114         .maximumSize(numBlocks - 1)
115         .removalListener(listener)
116         .<BlockCacheKey, CacheablePair>build()
117         .asMap();
118 
119 
120   }
121 
122   @Override
123   public void cacheBlock(BlockCacheKey blockName, Cacheable toBeCached) {
124     ByteBuffer storedBlock;
125 
126     try {
127       storedBlock = backingStore.alloc(toBeCached.getSerializedLength());
128     } catch (InterruptedException e) {
129       LOG.warn("SlabAllocator was interrupted while waiting for block to become available");
130       LOG.warn(e);
131       return;
132     }
133 
134     CacheablePair newEntry = new CacheablePair(toBeCached.getDeserializer(),
135         storedBlock);
136     toBeCached.serialize(storedBlock);
137 
138     synchronized (this) {
139       CacheablePair alreadyCached = backingMap.putIfAbsent(blockName, newEntry);
140     
141 
142       if (alreadyCached != null) {
143         backingStore.free(storedBlock);
144         throw new RuntimeException("already cached " + blockName);
145       }
146       if (actionWatcher != null) {
147         actionWatcher.onInsertion(blockName, this);
148       }
149     }
150     newEntry.recentlyAccessed.set(System.nanoTime());
151     this.size.addAndGet(newEntry.heapSize());
152   }
153 
154   @Override
155   public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat) {
156     CacheablePair contentBlock = backingMap.get(key);
157     if (contentBlock == null) {
158       if (!repeat) stats.miss(caching);
159       return null;
160     }
161 
162     stats.hit(caching);
163     // If lock cannot be obtained, that means we're undergoing eviction.
164     try {
165       contentBlock.recentlyAccessed.set(System.nanoTime());
166       synchronized (contentBlock) {
167         if (contentBlock.serializedData == null) {
168           // concurrently evicted
169           LOG.warn("Concurrent eviction of " + key);
170           return null;
171         }
172         return contentBlock.deserializer
173             .deserialize(contentBlock.serializedData.asReadOnlyBuffer());
174       }
175     } catch (Throwable t) {
176       LOG.error("Deserializer threw an exception. This may indicate a bug.", t);
177       return null;
178     }
179   }
180 
181   /**
182    * Evicts the block
183    *
184    * @param key the key of the entry we are going to evict
185    * @return the evicted ByteBuffer
186    */
187   public boolean evictBlock(BlockCacheKey key) {
188     stats.evict();
189     CacheablePair evictedBlock = backingMap.remove(key);
190 
191     if (evictedBlock != null) {
192       doEviction(key, evictedBlock);
193     }
194     return evictedBlock != null;
195 
196   }
197 
198   private void doEviction(BlockCacheKey key, CacheablePair evictedBlock) {
199     long evictedHeap = 0;
200     synchronized (evictedBlock) {
201       if (evictedBlock.serializedData == null) {
202         // someone else already freed
203         return;
204       }
205       evictedHeap = evictedBlock.heapSize();
206       ByteBuffer bb = evictedBlock.serializedData;
207       evictedBlock.serializedData = null;
208       backingStore.free(bb);
209 
210       // We have to do this callback inside the synchronization here.
211       // Otherwise we can have the following interleaving:
212       // Thread A calls getBlock():
213       // SlabCache directs call to this SingleSizeCache
214       // It gets the CacheablePair object
215       // Thread B runs eviction
216       // doEviction() is called and sets serializedData = null, here.
217       // Thread A sees the null serializedData, and returns null
218       // Thread A calls cacheBlock on the same block, and gets
219       // "already cached" since the block is still in backingStore
220 
221       if (actionWatcher != null) {
222         actionWatcher.onEviction(key, this);
223       }
224     }
225     stats.evicted();
226     size.addAndGet(-1 * evictedHeap);
227   }
228 
229   public void logStats() {
230 
231     long milliseconds = this.timeSinceLastAccess.get() / 1000000;
232 
233     LOG.info("For Slab of size " + this.blockSize + ": "
234         + this.getOccupiedSize() / this.blockSize
235         + " occupied, out of a capacity of " + this.numBlocks
236         + " blocks. HeapSize is "
237         + StringUtils.humanReadableInt(this.heapSize()) + " bytes." + ", "
238         + "churnTime=" + StringUtils.formatTime(milliseconds));
239 
240     LOG.info("Slab Stats: " + "accesses="
241         + stats.getRequestCount()
242         + ", "
243         + "hits="
244         + stats.getHitCount()
245         + ", "
246         + "hitRatio="
247         + (stats.getHitCount() == 0 ? "0" : (StringUtils.formatPercent(
248             stats.getHitRatio(), 2) + "%, "))
249         + "cachingAccesses="
250         + stats.getRequestCachingCount()
251         + ", "
252         + "cachingHits="
253         + stats.getHitCachingCount()
254         + ", "
255         + "cachingHitsRatio="
256         + (stats.getHitCachingCount() == 0 ? "0" : (StringUtils.formatPercent(
257             stats.getHitCachingRatio(), 2) + "%, ")) + "evictions="
258         + stats.getEvictionCount() + ", " + "evicted="
259         + stats.getEvictedCount() + ", " + "evictedPerRun="
260         + stats.evictedPerEviction());
261 
262   }
263 
264   public void shutdown() {
265     backingStore.shutdown();
266   }
267 
268   public long heapSize() {
269     return this.size.get() + backingStore.heapSize();
270   }
271 
272   public long size() {
273     return (long) this.blockSize * (long) this.numBlocks;
274   }
275 
276   public long getFreeSize() {
277     return (long) backingStore.getBlocksRemaining() * (long) blockSize;
278   }
279 
280   public long getOccupiedSize() {
281     return (long) (numBlocks - backingStore.getBlocksRemaining()) * (long) blockSize;
282   }
283 
284   public long getEvictedCount() {
285     return stats.getEvictedCount();
286   }
287 
288   public CacheStats getStats() {
289     return this.stats;
290   }
291 
292   @Override
293   public long getBlockCount() {
294     return numBlocks - backingStore.getBlocksRemaining();
295   }
296 
297   /* Since its offheap, it doesn't matter if its in memory or not */
298   @Override
299   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
300     this.cacheBlock(cacheKey, buf);
301   }
302 
303   /*
304    * This is never called, as evictions are handled in the SlabCache layer,
305    * implemented in the event we want to use this as a standalone cache.
306    */
307   @Override
308   public int evictBlocksByHfileName(String hfileName) {
309     int evictedCount = 0;
310     for (BlockCacheKey e : backingMap.keySet()) {
311       if (e.getHfileName().equals(hfileName)) {
312         this.evictBlock(e);
313       }
314     }
315     return evictedCount;
316   }
317 
318   @Override
319   public long getCurrentSize() {
320     return 0;
321   }
322 
323   /*
324    * Not implemented. Extremely costly to do this from the off heap cache, you'd
325    * need to copy every object on heap once
326    */
327   @Override
328   public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries(
329       Configuration conf) {
330     throw new UnsupportedOperationException();
331   }
332 
333   /* Just a pair class, holds a reference to the parent cacheable */
334   private class CacheablePair implements HeapSize {
335     final CacheableDeserializer<Cacheable> deserializer;
336     ByteBuffer serializedData;
337     AtomicLong recentlyAccessed;
338 
339     private CacheablePair(CacheableDeserializer<Cacheable> deserializer,
340         ByteBuffer serializedData) {
341       this.recentlyAccessed = new AtomicLong();
342       this.deserializer = deserializer;
343       this.serializedData = serializedData;
344     }
345 
346     /*
347      * Heapsize overhead of this is the default object overhead, the heapsize of
348      * the serialized object, and the cost of a reference to the bytebuffer,
349      * which is already accounted for in SingleSizeCache
350      */
351     @Override
352     public long heapSize() {
353       return ClassSize.align(ClassSize.OBJECT + ClassSize.REFERENCE * 3
354           + ClassSize.ATOMIC_LONG);
355     }
356   }
357 }