View Javadoc

1   /**
2    * Copyright 2011 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.io.hfile.slab;
22  
23  import java.math.BigDecimal;
24  import java.util.List;
25  import java.util.Map.Entry;
26  import java.util.TreeMap;
27  import java.util.concurrent.ConcurrentHashMap;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.ScheduledExecutorService;
30  import java.util.concurrent.TimeUnit;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.io.HeapSize;
37  import org.apache.hadoop.hbase.io.hfile.BlockCache;
38  import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
39  import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
40  import org.apache.hadoop.hbase.io.hfile.CacheStats;
41  import org.apache.hadoop.hbase.io.hfile.Cacheable;
42  import org.apache.hadoop.hbase.util.ClassSize;
43  import org.apache.hadoop.hbase.util.HasThread;
44  import org.apache.hadoop.util.StringUtils;
45  
46  import com.google.common.base.Preconditions;
47  import com.google.common.util.concurrent.ThreadFactoryBuilder;
48  
49  /**
50   * SlabCache is composed of multiple SingleSizeCaches. It uses a TreeMap in
51   * order to determine where a given element fits. Redirects gets and puts to the
52   * correct SingleSizeCache.
53   *
54   **/
55  public class SlabCache implements SlabItemActionWatcher, BlockCache, HeapSize {
56  
57    private final ConcurrentHashMap<BlockCacheKey, SingleSizeCache> backingStore;
58    private final TreeMap<Integer, SingleSizeCache> sizer;
59    static final Log LOG = LogFactory.getLog(SlabCache.class);
60    static final int STAT_THREAD_PERIOD_SECS = 60 * 5;
61  
62    private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
63        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Slab Statistics #%d").build());
64  
65    long size;
66    private final CacheStats stats;
67    final SlabStats requestStats;
68    final SlabStats successfullyCachedStats;
69    private final long avgBlockSize;
70    private static final long CACHE_FIXED_OVERHEAD = ClassSize.estimateBase(
71        SlabCache.class, false);
72  
73    /**
74     * Default constructor, creates an empty SlabCache.
75     *
76     * @param size Total size allocated to the SlabCache. (Bytes)
77     * @param avgBlockSize Average size of a block being cached.
78     **/
79  
80    public SlabCache(long size, long avgBlockSize) {
81      this.avgBlockSize = avgBlockSize;
82      this.size = size;
83      this.stats = new CacheStats();
84      this.requestStats = new SlabStats();
85      this.successfullyCachedStats = new SlabStats();
86  
87      backingStore = new ConcurrentHashMap<BlockCacheKey, SingleSizeCache>();
88      sizer = new TreeMap<Integer, SingleSizeCache>();
89      this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
90          STAT_THREAD_PERIOD_SECS, STAT_THREAD_PERIOD_SECS, TimeUnit.SECONDS);
91  
92    }
93  
94    /**
95     * A way of allocating the desired amount of Slabs of each particular size.
96     *
97     * This reads two lists from conf, hbase.offheap.slab.proportions and
98     * hbase.offheap.slab.sizes.
99     *
100    * The first list is the percentage of our total space we allocate to the
101    * slabs.
102    *
103    * The second list is blocksize of the slabs in bytes. (E.g. the slab holds
104    * blocks of this size).
105    *
106    * @param conf Configuration file.
107    */
108   public void addSlabByConf(Configuration conf) {
109     // Proportions we allocate to each slab of the total size.
110     String[] porportions = conf.getStrings(
111         "hbase.offheapcache.slab.proportions", "0.80", "0.20");
112     String[] sizes = conf.getStrings("hbase.offheapcache.slab.sizes",
113         Long.valueOf(avgBlockSize * 11 / 10).toString(),
114         Long.valueOf(avgBlockSize * 21 / 10).toString());
115 
116     if (porportions.length != sizes.length) {
117       throw new IllegalArgumentException(
118           "SlabCache conf not "
119               + "initialized, error in configuration. hbase.offheap.slab.proportions specifies "
120               + porportions.length
121               + " slabs while hbase.offheap.slab.sizes specifies "
122               + sizes.length + " slabs "
123               + "offheapslabporportions and offheapslabsizes");
124     }
125     /*
126      * We use BigDecimals instead of floats because float rounding is annoying
127      */
128 
129     BigDecimal[] parsedProportions = stringArrayToBigDecimalArray(porportions);
130     BigDecimal[] parsedSizes = stringArrayToBigDecimalArray(sizes);
131 
132     BigDecimal sumProportions = new BigDecimal(0);
133     for (BigDecimal b : parsedProportions) {
134       /* Make sure all proportions are greater than 0 */
135       Preconditions
136           .checkArgument(b.compareTo(BigDecimal.ZERO) == 1,
137               "Proportions in hbase.offheap.slab.proportions must be greater than 0!");
138       sumProportions = sumProportions.add(b);
139     }
140 
141     /* If the sum is greater than 1 */
142     Preconditions
143         .checkArgument(sumProportions.compareTo(BigDecimal.ONE) != 1,
144             "Sum of all proportions in hbase.offheap.slab.proportions must be less than 1");
145 
146     /* If the sum of all proportions is less than 0.99 */
147     if (sumProportions.compareTo(new BigDecimal("0.99")) == -1) {
148       LOG.warn("Sum of hbase.offheap.slab.proportions is less than 0.99! Memory is being wasted");
149     }
150     for (int i = 0; i < parsedProportions.length; i++) {
151       int blockSize = parsedSizes[i].intValue();
152       int numBlocks = new BigDecimal(this.size).multiply(parsedProportions[i])
153           .divide(parsedSizes[i], BigDecimal.ROUND_DOWN).intValue();
154       addSlab(blockSize, numBlocks);
155     }
156   }
157 
158   /**
159    * Gets the size of the slab cache a ByteBuffer of this size would be
160    * allocated to.
161    *
162    * @param size Size of the ByteBuffer we are checking.
163    *
164    * @return the Slab that the above bytebuffer would be allocated towards. If
165    *         object is too large, returns null.
166    */
167   Entry<Integer, SingleSizeCache> getHigherBlock(int size) {
168     return sizer.higherEntry(size - 1);
169   }
170 
171   private BigDecimal[] stringArrayToBigDecimalArray(String[] parsee) {
172     BigDecimal[] parsed = new BigDecimal[parsee.length];
173     for (int i = 0; i < parsee.length; i++) {
174       parsed[i] = new BigDecimal(parsee[i].trim());
175     }
176     return parsed;
177   }
178 
179   private void addSlab(int blockSize, int numBlocks) {
180     LOG.info("Creating a slab of blockSize " + blockSize + " with " + numBlocks
181         + " blocks.");
182     sizer.put(blockSize, new SingleSizeCache(blockSize, numBlocks, this));
183   }
184 
185   /**
186    * Cache the block with the specified key and buffer. First finds what size
187    * SingleSlabCache it should fit in. If the block doesn't fit in any, it will
188    * return without doing anything.
189    * <p>
190    * It is assumed this will NEVER be called on an already cached block. If that
191    * is done, it is assumed that you are reinserting the same exact block due to
192    * a race condition, and will throw a runtime exception.
193    *
194    * @param cacheKey block cache key
195    * @param cachedItem block buffer
196    */
197   public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem) {
198     Entry<Integer, SingleSizeCache> scacheEntry = getHigherBlock(cachedItem
199         .getSerializedLength());
200 
201     this.requestStats.addin(cachedItem.getSerializedLength());
202 
203     if (scacheEntry == null) {
204       return; // we can't cache, something too big.
205     }
206 
207     this.successfullyCachedStats.addin(cachedItem.getSerializedLength());
208     SingleSizeCache scache = scacheEntry.getValue();
209 
210     /*
211      * This will throw a runtime exception if we try to cache the same value
212      * twice
213      */
214     scache.cacheBlock(cacheKey, cachedItem);
215   } 
216 
217   /**
218    * We don't care about whether its in memory or not, so we just pass the call
219    * through.
220    */
221   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
222     cacheBlock(cacheKey, buf);
223   }
224 
225   public CacheStats getStats() {
226     return this.stats;
227   }
228 
229   /**
230    * Get the buffer of the block with the specified name.
231    * @param caching
232    * @param key
233    * @param repeat
234    *
235    * @return buffer of specified block name, or null if not in cache
236    */
237   public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat) {
238     SingleSizeCache cachedBlock = backingStore.get(key);
239     if (cachedBlock == null) {
240       if (!repeat) stats.miss(caching);
241       return null;
242     }
243 
244     Cacheable contentBlock = cachedBlock.getBlock(key, caching, false);
245 
246     if (contentBlock != null) {
247       stats.hit(caching);
248     } else {
249       if (!repeat) stats.miss(caching);
250     }
251     return contentBlock;
252   }
253 
254   /**
255    * Evicts a block from the cache. This is public, and thus contributes to the
256    * the evict counter.
257    */
258   public boolean evictBlock(BlockCacheKey cacheKey) {
259     SingleSizeCache cacheEntry = backingStore.get(cacheKey);
260     if (cacheEntry == null) {
261       return false;
262     } else {
263       cacheEntry.evictBlock(cacheKey);
264       return true;
265     }
266   }
267 
268   @Override
269   public void onEviction(BlockCacheKey key, SingleSizeCache notifier) {
270     stats.evicted();
271     backingStore.remove(key);
272   }
273   
274   @Override
275   public void onInsertion(BlockCacheKey key, SingleSizeCache notifier) {
276     backingStore.put(key, notifier);
277   }
278 
279   /**
280    * Sends a shutdown to all SingleSizeCache's contained by this cache.
281    *
282    * Also terminates the scheduleThreadPool.
283    */
284   public void shutdown() {
285     for (SingleSizeCache s : sizer.values()) {
286       s.shutdown();
287     }
288     this.scheduleThreadPool.shutdown();
289   }
290 
291   public long heapSize() {
292     long childCacheSize = 0;
293     for (SingleSizeCache s : sizer.values()) {
294       childCacheSize += s.heapSize();
295     }
296     return SlabCache.CACHE_FIXED_OVERHEAD + childCacheSize;
297   }
298 
299   public long size() {
300     return this.size;
301   }
302 
303   public long getFreeSize() {
304     return 0; // this cache, by default, allocates all its space.
305   }
306 
307   @Override
308   public long getBlockCount() {
309     long count = 0;
310     for (SingleSizeCache cache : backingStore.values()) {
311       count += cache.getBlockCount();
312     }
313     return count;
314   }
315 
316   public long getCurrentSize() {
317     return size;
318   }
319 
320   public long getEvictedCount() {
321     return stats.getEvictedCount();
322   }
323 
324   /*
325    * Statistics thread. Periodically prints the cache statistics to the log.
326    */
327   static class StatisticsThread extends HasThread {
328     SlabCache ourcache;
329 
330     public StatisticsThread(SlabCache slabCache) {
331       super("SlabCache.StatisticsThread");
332       setDaemon(true);
333       this.ourcache = slabCache;
334     }
335 
336     @Override
337     public void run() {
338       for (SingleSizeCache s : ourcache.sizer.values()) {
339         s.logStats();
340       }
341 
342       SlabCache.LOG.info("Current heap size is: "
343           + StringUtils.humanReadableInt(ourcache.heapSize()));
344 
345       LOG.info("Request Stats");
346       ourcache.requestStats.logStats();
347       LOG.info("Successfully Cached Stats");
348       ourcache.successfullyCachedStats.logStats();
349     }
350 
351   }
352 
353   /**
354    * Just like CacheStats, but more Slab specific. Finely grained profiling of
355    * sizes we store using logs.
356    *
357    */
358   static class SlabStats {
359     // the maximum size somebody will ever try to cache, then we multiply by
360     // 10
361     // so we have finer grained stats.
362     final int MULTIPLIER = 10;
363     final int NUMDIVISIONS = (int) (Math.log(Integer.MAX_VALUE) * MULTIPLIER);
364     private final AtomicLong[] counts = new AtomicLong[NUMDIVISIONS];
365 
366     public SlabStats() {
367       for (int i = 0; i < NUMDIVISIONS; i++) {
368         counts[i] = new AtomicLong();
369       }
370     }
371 
372     public void addin(int size) {
373       int index = (int) (Math.log(size) * MULTIPLIER);
374       counts[index].incrementAndGet();
375     }
376 
377     public AtomicLong[] getUsage() {
378       return counts;
379     }
380 
381     double getUpperBound(int index) {
382       return Math.pow(Math.E, ((index + 0.5) / MULTIPLIER));
383     }
384 
385     double getLowerBound(int index) {
386       return Math.pow(Math.E, ((index - 0.5) / MULTIPLIER));
387     }
388 
389     public void logStats() {
390       AtomicLong[] fineGrainedStats = getUsage();
391       for (int i = 0; i < fineGrainedStats.length; i++) {
392 
393         if (fineGrainedStats[i].get() > 0) {
394           SlabCache.LOG.info("From  "
395               + StringUtils.humanReadableInt((long) getLowerBound(i)) + "- "
396               + StringUtils.humanReadableInt((long) getUpperBound(i)) + ": "
397               + StringUtils.humanReadableInt(fineGrainedStats[i].get())
398               + " requests");
399 
400         }
401       }
402     }
403   }
404 
405   public int evictBlocksByHfileName(String hfileName) {
406     int numEvicted = 0;
407     for (BlockCacheKey key : backingStore.keySet()) {
408       if (key.getHfileName().equals(hfileName)) {
409         if (evictBlock(key))
410           ++numEvicted;
411       }
412     }
413     return numEvicted;
414   }
415 
416   /*
417    * Not implemented. Extremely costly to do this from the off heap cache, you'd
418    * need to copy every object on heap once
419    */
420   @Override
421   public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries(
422       Configuration conf) {
423     throw new UnsupportedOperationException();
424   }
425 
426 }