View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.io.hfile.slab;
21  
22  import java.math.BigDecimal;
23  import java.util.List;
24  import java.util.Map.Entry;
25  import java.util.TreeMap;
26  import java.util.concurrent.ConcurrentHashMap;
27  import java.util.concurrent.Executors;
28  import java.util.concurrent.ScheduledExecutorService;
29  import java.util.concurrent.TimeUnit;
30  import java.util.concurrent.atomic.AtomicLong;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.classification.InterfaceAudience;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.io.HeapSize;
37  import org.apache.hadoop.hbase.io.hfile.BlockCache;
38  import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
39  import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
40  import org.apache.hadoop.hbase.io.hfile.CacheStats;
41  import org.apache.hadoop.hbase.io.hfile.Cacheable;
42  import org.apache.hadoop.hbase.util.ClassSize;
43  import org.apache.hadoop.hbase.util.HasThread;
44  import org.apache.hadoop.util.StringUtils;
45  
46  import com.google.common.base.Preconditions;
47  import com.google.common.util.concurrent.ThreadFactoryBuilder;
48  
49  /**
50   * SlabCache is composed of multiple SingleSizeCaches. It uses a TreeMap in
51   * order to determine where a given element fits. Redirects gets and puts to the
52   * correct SingleSizeCache.
53   *
54   **/
55  @InterfaceAudience.Private
56  public class SlabCache implements SlabItemActionWatcher, BlockCache, HeapSize {
57  
58    private final ConcurrentHashMap<BlockCacheKey, SingleSizeCache> backingStore;
59    private final TreeMap<Integer, SingleSizeCache> sizer;
60    static final Log LOG = LogFactory.getLog(SlabCache.class);
61    static final int STAT_THREAD_PERIOD_SECS = 60 * 5;
62  
63    private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
64        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Slab Statistics #%d").build());
65  
66    long size;
67    private final CacheStats stats;
68    final SlabStats requestStats;
69    final SlabStats successfullyCachedStats;
70    private final long avgBlockSize;
71    private static final long CACHE_FIXED_OVERHEAD = ClassSize.estimateBase(
72        SlabCache.class, false);
73  
74    /**
75     * Default constructor, creates an empty SlabCache.
76     *
77     * @param size Total size allocated to the SlabCache. (Bytes)
78     * @param avgBlockSize Average size of a block being cached.
79     **/
80  
81    public SlabCache(long size, long avgBlockSize) {
82      this.avgBlockSize = avgBlockSize;
83      this.size = size;
84      this.stats = new CacheStats();
85      this.requestStats = new SlabStats();
86      this.successfullyCachedStats = new SlabStats();
87  
88      backingStore = new ConcurrentHashMap<BlockCacheKey, SingleSizeCache>();
89      sizer = new TreeMap<Integer, SingleSizeCache>();
90      this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
91          STAT_THREAD_PERIOD_SECS, STAT_THREAD_PERIOD_SECS, TimeUnit.SECONDS);
92  
93    }
94  
95    /**
96     * A way of allocating the desired amount of Slabs of each particular size.
97     *
98     * This reads two lists from conf, hbase.offheap.slab.proportions and
99     * hbase.offheap.slab.sizes.
100    *
101    * The first list is the percentage of our total space we allocate to the
102    * slabs.
103    *
104    * The second list is blocksize of the slabs in bytes. (E.g. the slab holds
105    * blocks of this size).
106    *
107    * @param conf Configuration file.
108    */
109   public void addSlabByConf(Configuration conf) {
110     // Proportions we allocate to each slab of the total size.
111     String[] porportions = conf.getStrings(
112         "hbase.offheapcache.slab.proportions", "0.80", "0.20");
113     String[] sizes = conf.getStrings("hbase.offheapcache.slab.sizes",
114         Long.valueOf(avgBlockSize * 11 / 10).toString(),
115         Long.valueOf(avgBlockSize * 21 / 10).toString());
116 
117     if (porportions.length != sizes.length) {
118       throw new IllegalArgumentException(
119           "SlabCache conf not "
120               + "initialized, error in configuration. hbase.offheap.slab.proportions specifies "
121               + porportions.length
122               + " slabs while hbase.offheap.slab.sizes specifies "
123               + sizes.length + " slabs "
124               + "offheapslabporportions and offheapslabsizes");
125     }
126     /*
127      * We use BigDecimals instead of floats because float rounding is annoying
128      */
129 
130     BigDecimal[] parsedProportions = stringArrayToBigDecimalArray(porportions);
131     BigDecimal[] parsedSizes = stringArrayToBigDecimalArray(sizes);
132 
133     BigDecimal sumProportions = new BigDecimal(0);
134     for (BigDecimal b : parsedProportions) {
135       /* Make sure all proportions are greater than 0 */
136       Preconditions
137           .checkArgument(b.compareTo(BigDecimal.ZERO) == 1,
138               "Proportions in hbase.offheap.slab.proportions must be greater than 0!");
139       sumProportions = sumProportions.add(b);
140     }
141 
142     /* If the sum is greater than 1 */
143     Preconditions
144         .checkArgument(sumProportions.compareTo(BigDecimal.ONE) != 1,
145             "Sum of all proportions in hbase.offheap.slab.proportions must be less than 1");
146 
147     /* If the sum of all proportions is less than 0.99 */
148     if (sumProportions.compareTo(new BigDecimal("0.99")) == -1) {
149       LOG.warn("Sum of hbase.offheap.slab.proportions is less than 0.99! Memory is being wasted");
150     }
151     for (int i = 0; i < parsedProportions.length; i++) {
152       int blockSize = parsedSizes[i].intValue();
153       int numBlocks = new BigDecimal(this.size).multiply(parsedProportions[i])
154           .divide(parsedSizes[i], BigDecimal.ROUND_DOWN).intValue();
155       addSlab(blockSize, numBlocks);
156     }
157   }
158 
159   /**
160    * Gets the size of the slab cache a ByteBuffer of this size would be
161    * allocated to.
162    *
163    * @param size Size of the ByteBuffer we are checking.
164    *
165    * @return the Slab that the above bytebuffer would be allocated towards. If
166    *         object is too large, returns null.
167    */
168   Entry<Integer, SingleSizeCache> getHigherBlock(int size) {
169     return sizer.higherEntry(size - 1);
170   }
171 
172   private BigDecimal[] stringArrayToBigDecimalArray(String[] parsee) {
173     BigDecimal[] parsed = new BigDecimal[parsee.length];
174     for (int i = 0; i < parsee.length; i++) {
175       parsed[i] = new BigDecimal(parsee[i].trim());
176     }
177     return parsed;
178   }
179 
180   private void addSlab(int blockSize, int numBlocks) {
181     LOG.info("Creating a slab of blockSize " + blockSize + " with " + numBlocks
182         + " blocks, " + StringUtils.humanReadableInt(blockSize * (long) numBlocks) + "bytes.");
183     sizer.put(blockSize, new SingleSizeCache(blockSize, numBlocks, this));
184   }
185 
186   /**
187    * Cache the block with the specified key and buffer. First finds what size
188    * SingleSlabCache it should fit in. If the block doesn't fit in any, it will
189    * return without doing anything.
190    * <p>
191    * It is assumed this will NEVER be called on an already cached block. If that
192    * is done, it is assumed that you are reinserting the same exact block due to
193    * a race condition, and will throw a runtime exception.
194    *
195    * @param cacheKey block cache key
196    * @param cachedItem block buffer
197    */
198   public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem) {
199     Entry<Integer, SingleSizeCache> scacheEntry = getHigherBlock(cachedItem
200         .getSerializedLength());
201 
202     this.requestStats.addin(cachedItem.getSerializedLength());
203 
204     if (scacheEntry == null) {
205       return; // we can't cache, something too big.
206     }
207 
208     this.successfullyCachedStats.addin(cachedItem.getSerializedLength());
209     SingleSizeCache scache = scacheEntry.getValue();
210 
211     /*
212      * This will throw a runtime exception if we try to cache the same value
213      * twice
214      */
215     scache.cacheBlock(cacheKey, cachedItem);
216   } 
217 
218   /**
219    * We don't care about whether its in memory or not, so we just pass the call
220    * through.
221    */
222   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
223     cacheBlock(cacheKey, buf);
224   }
225 
226   public CacheStats getStats() {
227     return this.stats;
228   }
229 
230   /**
231    * Get the buffer of the block with the specified name.
232    *
233    * @return buffer of specified block name, or null if not in cache
234    */
235   public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat) {
236     SingleSizeCache cachedBlock = backingStore.get(key);
237     if (cachedBlock == null) {
238       if (!repeat) stats.miss(caching);
239       return null;
240     }
241 
242     Cacheable contentBlock = cachedBlock.getBlock(key, caching, false);
243 
244     if (contentBlock != null) {
245       stats.hit(caching);
246     } else if (!repeat) {
247       stats.miss(caching);
248     }
249     return contentBlock;
250   }
251 
252   /**
253    * Evicts a block from the cache. This is public, and thus contributes to the
254    * the evict counter.
255    */
256   public boolean evictBlock(BlockCacheKey cacheKey) {
257     SingleSizeCache cacheEntry = backingStore.get(cacheKey);
258     if (cacheEntry == null) {
259       return false;
260     } else {
261       cacheEntry.evictBlock(cacheKey);
262       return true;
263     }
264   }
265 
266   @Override
267   public void onEviction(BlockCacheKey key, SingleSizeCache notifier) {
268     stats.evicted();
269     backingStore.remove(key);
270   }
271   
272   @Override
273   public void onInsertion(BlockCacheKey key, SingleSizeCache notifier) {
274     backingStore.put(key, notifier);
275   }
276 
277   /**
278    * Sends a shutdown to all SingleSizeCache's contained by this cache.
279    *
280    * Also terminates the scheduleThreadPool.
281    */
282   public void shutdown() {
283     for (SingleSizeCache s : sizer.values()) {
284       s.shutdown();
285     }
286     this.scheduleThreadPool.shutdown();
287   }
288 
289   public long heapSize() {
290     long childCacheSize = 0;
291     for (SingleSizeCache s : sizer.values()) {
292       childCacheSize += s.heapSize();
293     }
294     return SlabCache.CACHE_FIXED_OVERHEAD + childCacheSize;
295   }
296 
297   public long size() {
298     return this.size;
299   }
300 
301   public long getFreeSize() {
302     long childFreeSize = 0;
303     for (SingleSizeCache s : sizer.values()) {
304       childFreeSize += s.getFreeSize();
305     }
306     return childFreeSize;
307   }
308 
309   @Override
310   public long getBlockCount() {
311     long count = 0;
312     for (SingleSizeCache cache : sizer.values()) {
313       count += cache.getBlockCount();
314     }
315     return count;
316   }
317 
318   public long getCurrentSize() {
319     return size;
320   }
321 
322   public long getEvictedCount() {
323     return stats.getEvictedCount();
324   }
325 
326   /*
327    * Statistics thread. Periodically prints the cache statistics to the log.
328    */
329   static class StatisticsThread extends HasThread {
330     SlabCache ourcache;
331 
332     public StatisticsThread(SlabCache slabCache) {
333       super("SlabCache.StatisticsThread");
334       setDaemon(true);
335       this.ourcache = slabCache;
336     }
337 
338     @Override
339     public void run() {
340       for (SingleSizeCache s : ourcache.sizer.values()) {
341         s.logStats();
342       }
343 
344       SlabCache.LOG.info("Current heap size is: "
345           + StringUtils.humanReadableInt(ourcache.heapSize()));
346 
347       LOG.info("Request Stats");
348       ourcache.requestStats.logStats();
349       LOG.info("Successfully Cached Stats");
350       ourcache.successfullyCachedStats.logStats();
351     }
352 
353   }
354 
355   /**
356    * Just like CacheStats, but more Slab specific. Finely grained profiling of
357    * sizes we store using logs.
358    *
359    */
360   static class SlabStats {
361     // the maximum size somebody will ever try to cache, then we multiply by
362     // 10
363     // so we have finer grained stats.
364     static final int MULTIPLIER = 10;
365     final int NUMDIVISIONS = (int) (Math.log(Integer.MAX_VALUE) * MULTIPLIER);
366     private final AtomicLong[] counts = new AtomicLong[NUMDIVISIONS];
367 
368     public SlabStats() {
369       for (int i = 0; i < NUMDIVISIONS; i++) {
370         counts[i] = new AtomicLong();
371       }
372     }
373 
374     public void addin(int size) {
375       int index = (int) (Math.log(size) * MULTIPLIER);
376       counts[index].incrementAndGet();
377     }
378 
379     public AtomicLong[] getUsage() {
380       return counts;
381     }
382 
383     double getUpperBound(int index) {
384       return Math.pow(Math.E, ((index + 0.5) / MULTIPLIER));
385     }
386 
387     double getLowerBound(int index) {
388       return Math.pow(Math.E, ((index - 0.5) / MULTIPLIER));
389     }
390 
391     public void logStats() {
392       AtomicLong[] fineGrainedStats = getUsage();
393       for (int i = 0; i < fineGrainedStats.length; i++) {
394 
395         if (fineGrainedStats[i].get() > 0) {
396           SlabCache.LOG.info("From  "
397               + StringUtils.humanReadableInt((long) getLowerBound(i)) + "- "
398               + StringUtils.humanReadableInt((long) getUpperBound(i)) + ": "
399               + StringUtils.humanReadableInt(fineGrainedStats[i].get())
400               + " requests");
401 
402         }
403       }
404     }
405   }
406 
407   public int evictBlocksByHfileName(String hfileName) {
408     int numEvicted = 0;
409     for (BlockCacheKey key : backingStore.keySet()) {
410       if (key.getHfileName().equals(hfileName)) {
411         if (evictBlock(key))
412           ++numEvicted;
413       }
414     }
415     return numEvicted;
416   }
417 
418   /*
419    * Not implemented. Extremely costly to do this from the off heap cache, you'd
420    * need to copy every object on heap once
421    */
422   @Override
423   public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries(
424       Configuration conf) {
425     throw new UnsupportedOperationException();
426   }
427 
428 }