View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.io.hfile.slab;
21  
22  import java.math.BigDecimal;
23  import java.util.Iterator;
24  import java.util.Map;
25  import java.util.Map.Entry;
26  import java.util.TreeMap;
27  import java.util.concurrent.ConcurrentHashMap;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.ScheduledExecutorService;
30  import java.util.concurrent.TimeUnit;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.classification.InterfaceAudience;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.hbase.io.HeapSize;
38  import org.apache.hadoop.hbase.io.hfile.BlockCache;
39  import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
40  import org.apache.hadoop.hbase.io.hfile.BlockPriority;
41  import org.apache.hadoop.hbase.io.hfile.BlockType;
42  import org.apache.hadoop.hbase.io.hfile.CacheStats;
43  import org.apache.hadoop.hbase.io.hfile.Cacheable;
44  import org.apache.hadoop.hbase.io.hfile.CachedBlock;
45  import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
46  import org.apache.hadoop.hbase.util.ClassSize;
47  import org.apache.hadoop.hbase.util.HasThread;
48  import org.apache.hadoop.util.StringUtils;
49  
50  import com.google.common.base.Preconditions;
51  import com.google.common.util.concurrent.ThreadFactoryBuilder;
52  
53  /**
54   * SlabCache is composed of multiple SingleSizeCaches. It uses a TreeMap in
55   * order to determine where a given element fits. Redirects gets and puts to the
56   * correct SingleSizeCache.
57   * 
58   * <p>It is configured with a call to {@link #addSlab(int, int)}
59   *
60   * @deprecated As of 1.0, replaced by {@link org.apache.hadoop.hbase.io.hfile.bucket.BucketCache}.
61   */
62  @InterfaceAudience.Private
63  @Deprecated
64  public class SlabCache implements SlabItemActionWatcher, BlockCache, HeapSize {
65    private final ConcurrentHashMap<BlockCacheKey, SingleSizeCache> backingStore;
66    private final TreeMap<Integer, SingleSizeCache> slabs;
67    static final Log LOG = LogFactory.getLog(SlabCache.class);
68    static final int STAT_THREAD_PERIOD_SECS = 60 * 5;
69  
70    private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
71        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Slab Statistics #%d").build());
72  
73    long size;
74    private final CacheStats stats;
75    final SlabStats requestStats;
76    final SlabStats successfullyCachedStats;
77    private final long avgBlockSize;
78    private static final long CACHE_FIXED_OVERHEAD = ClassSize.estimateBase(
79        SlabCache.class, false);
80  
81    /**
82     * Key used reading from configuration list of the percentage of our total space we allocate to
83     * the slabs.  Defaults: "0.80", "0.20".
84     * @see #SLAB_CACHE_SIZES_KEY Must have corresponding number of elements.
85     */
86    static final String SLAB_CACHE_PROPORTIONS_KEY = "hbase.offheapcache.slab.proportions";
87  
88    /**
89     * Configuration key for list of the blocksize of the slabs in bytes. (E.g. the slab holds
90     * blocks of this size).  Defaults: avgBlockSize * 11 / 10, avgBlockSize * 21 / 10
91     * @see #SLAB_CACHE_PROPORTIONS_KEY
92     */
93    static final String SLAB_CACHE_SIZES_KEY = "hbase.offheapcache.slab.sizes";
94  
95    /**
96     * Default constructor, creates an empty SlabCache.
97     *
98     * @param size Total size allocated to the SlabCache. (Bytes)
99     * @param avgBlockSize Average size of a block being cached.
100    **/
101   public SlabCache(long size, long avgBlockSize) {
102     this.avgBlockSize = avgBlockSize;
103     this.size = size;
104     this.stats = new CacheStats();
105     this.requestStats = new SlabStats();
106     this.successfullyCachedStats = new SlabStats();
107 
108     backingStore = new ConcurrentHashMap<BlockCacheKey, SingleSizeCache>();
109     slabs = new TreeMap<Integer, SingleSizeCache>();
110     this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
111         STAT_THREAD_PERIOD_SECS, STAT_THREAD_PERIOD_SECS, TimeUnit.SECONDS);
112   }
113 
114   public Map<Integer, SingleSizeCache> getSizer() {
115     return slabs;
116   }
117 
118   /**
119    * A way of allocating the desired amount of Slabs of each particular size.
120    *
121    * This reads two lists from conf, hbase.offheap.slab.proportions and
122    * hbase.offheap.slab.sizes.
123    *
124    * The first list is the percentage of our total space we allocate to the
125    * slabs.
126    *
127    * The second list is blocksize of the slabs in bytes. (E.g. the slab holds
128    * blocks of this size).
129    *
130    * @param conf Configuration file.
131    */
132   public void addSlabByConf(Configuration conf) {
133     // Proportions we allocate to each slab of the total size.
134     String[] porportions = conf.getStrings(SLAB_CACHE_PROPORTIONS_KEY, "0.80", "0.20");
135     String[] sizes = conf.getStrings(SLAB_CACHE_SIZES_KEY,
136         Long.valueOf(avgBlockSize * 11 / 10).toString(),
137         Long.valueOf(avgBlockSize * 21 / 10).toString());
138 
139     if (porportions.length != sizes.length) {
140       throw new IllegalArgumentException(
141           "SlabCache conf not "
142               + "initialized, error in configuration. hbase.offheap.slab.proportions specifies "
143               + porportions.length
144               + " slabs while hbase.offheap.slab.sizes specifies "
145               + sizes.length + " slabs "
146               + "offheapslabporportions and offheapslabsizes");
147     }
148     /*
149      * We use BigDecimals instead of floats because float rounding is annoying
150      */
151 
152     BigDecimal[] parsedProportions = stringArrayToBigDecimalArray(porportions);
153     BigDecimal[] parsedSizes = stringArrayToBigDecimalArray(sizes);
154 
155     BigDecimal sumProportions = new BigDecimal(0);
156     for (BigDecimal b : parsedProportions) {
157       /* Make sure all proportions are greater than 0 */
158       Preconditions
159           .checkArgument(b.compareTo(BigDecimal.ZERO) == 1,
160               "Proportions in hbase.offheap.slab.proportions must be greater than 0!");
161       sumProportions = sumProportions.add(b);
162     }
163 
164     /* If the sum is greater than 1 */
165     Preconditions
166         .checkArgument(sumProportions.compareTo(BigDecimal.ONE) != 1,
167             "Sum of all proportions in hbase.offheap.slab.proportions must be less than 1");
168 
169     /* If the sum of all proportions is less than 0.99 */
170     if (sumProportions.compareTo(new BigDecimal("0.99")) == -1) {
171       LOG.warn("Sum of hbase.offheap.slab.proportions is less than 0.99! Memory is being wasted");
172     }
173     for (int i = 0; i < parsedProportions.length; i++) {
174       int blockSize = parsedSizes[i].intValue();
175       int numBlocks = new BigDecimal(this.size).multiply(parsedProportions[i])
176           .divide(parsedSizes[i], BigDecimal.ROUND_DOWN).intValue();
177       addSlab(blockSize, numBlocks);
178     }
179   }
180 
181   /**
182    * Gets the size of the slab cache a ByteBuffer of this size would be
183    * allocated to.
184    *
185    * @param size Size of the ByteBuffer we are checking.
186    *
187    * @return the Slab that the above bytebuffer would be allocated towards. If
188    *         object is too large, returns null.
189    */
190   Entry<Integer, SingleSizeCache> getHigherBlock(int size) {
191     return slabs.higherEntry(size - 1);
192   }
193 
194   private BigDecimal[] stringArrayToBigDecimalArray(String[] parsee) {
195     BigDecimal[] parsed = new BigDecimal[parsee.length];
196     for (int i = 0; i < parsee.length; i++) {
197       parsed[i] = new BigDecimal(parsee[i].trim());
198     }
199     return parsed;
200   }
201 
202   private void addSlab(int blockSize, int numBlocks) {
203     LOG.info("Creating slab of blockSize " + blockSize + " with " + numBlocks
204         + " blocks, " + StringUtils.byteDesc(blockSize * (long) numBlocks) + "bytes.");
205     slabs.put(blockSize, new SingleSizeCache(blockSize, numBlocks, this));
206   }
207 
208   /**
209    * Cache the block with the specified key and buffer. First finds what size
210    * SingleSlabCache it should fit in. If the block doesn't fit in any, it will
211    * return without doing anything.
212    * <p>
213    * It is assumed this will NEVER be called on an already cached block. If that
214    * is done, it is assumed that you are reinserting the same exact block due to
215    * a race condition, and will throw a runtime exception.
216    *
217    * @param cacheKey block cache key
218    * @param cachedItem block buffer
219    */
220   public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem) {
221     Entry<Integer, SingleSizeCache> scacheEntry = getHigherBlock(cachedItem
222         .getSerializedLength());
223 
224     this.requestStats.addin(cachedItem.getSerializedLength());
225 
226     if (scacheEntry == null) {
227       return; // we can't cache, something too big.
228     }
229 
230     this.successfullyCachedStats.addin(cachedItem.getSerializedLength());
231     SingleSizeCache scache = scacheEntry.getValue();
232 
233     /*
234      * This will throw a runtime exception if we try to cache the same value
235      * twice
236      */
237     scache.cacheBlock(cacheKey, cachedItem);
238   } 
239 
240   /**
241    * We don't care about whether its in memory or not, so we just pass the call
242    * through.
243    */
244   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
245       final boolean cacheDataInL1) {
246     cacheBlock(cacheKey, buf);
247   }
248 
249   public CacheStats getStats() {
250     return this.stats;
251   }
252 
253   /**
254    * Get the buffer of the block with the specified name.
255    *
256    * @return buffer of specified block name, or null if not in cache
257    */
258   public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
259       boolean updateCacheMetrics) {
260     SingleSizeCache cachedBlock = backingStore.get(key);
261     if (cachedBlock == null) {
262       if (!repeat) stats.miss(caching);
263       return null;
264     }
265 
266     Cacheable contentBlock = cachedBlock.getBlock(key, caching, false, updateCacheMetrics);
267 
268     if (contentBlock != null) {
269       if (updateCacheMetrics) stats.hit(caching);
270     } else if (!repeat) {
271       if (updateCacheMetrics) stats.miss(caching);
272     }
273     return contentBlock;
274   }
275 
276   /**
277    * Evicts a block from the cache. This is public, and thus contributes to the
278    * the evict counter.
279    */
280   public boolean evictBlock(BlockCacheKey cacheKey) {
281     SingleSizeCache cacheEntry = backingStore.get(cacheKey);
282     if (cacheEntry == null) {
283       return false;
284     } else {
285       cacheEntry.evictBlock(cacheKey);
286       return true;
287     }
288   }
289 
290   @Override
291   public void onEviction(BlockCacheKey key, SingleSizeCache notifier) {
292     stats.evicted();
293     backingStore.remove(key);
294   }
295   
296   @Override
297   public void onInsertion(BlockCacheKey key, SingleSizeCache notifier) {
298     backingStore.put(key, notifier);
299   }
300 
301   /**
302    * Sends a shutdown to all SingleSizeCache's contained by this cache.
303    *
304    * Also terminates the scheduleThreadPool.
305    */
306   public void shutdown() {
307     for (SingleSizeCache s : slabs.values()) {
308       s.shutdown();
309     }
310     this.scheduleThreadPool.shutdown();
311   }
312 
313   public long heapSize() {
314     long childCacheSize = 0;
315     for (SingleSizeCache s : slabs.values()) {
316       childCacheSize += s.heapSize();
317     }
318     return SlabCache.CACHE_FIXED_OVERHEAD + childCacheSize;
319   }
320 
321   public long size() {
322     return this.size;
323   }
324 
325   public long getFreeSize() {
326     long childFreeSize = 0;
327     for (SingleSizeCache s : slabs.values()) {
328       childFreeSize += s.getFreeSize();
329     }
330     return childFreeSize;
331   }
332 
333   @Override
334   public long getBlockCount() {
335     long count = 0;
336     for (SingleSizeCache cache : slabs.values()) {
337       count += cache.getBlockCount();
338     }
339     return count;
340   }
341 
342   public long getCurrentSize() {
343     return size;
344   }
345 
346   public long getEvictedCount() {
347     return stats.getEvictedCount();
348   }
349 
350   /*
351    * Statistics thread. Periodically prints the cache statistics to the log.
352    * TODO: Fix.  Just emit to metrics.  Don't run a thread just to do a log.
353    */
354   static class StatisticsThread extends HasThread {
355     SlabCache ourcache;
356 
357     public StatisticsThread(SlabCache slabCache) {
358       super("SlabCache.StatisticsThread");
359       setDaemon(true);
360       this.ourcache = slabCache;
361     }
362 
363     @Override
364     public void run() {
365       for (SingleSizeCache s : ourcache.slabs.values()) {
366         s.logStats();
367       }
368 
369       SlabCache.LOG.info("Current heap size is: "
370           + StringUtils.humanReadableInt(ourcache.heapSize()));
371 
372       LOG.info("Request Stats");
373       ourcache.requestStats.logStats();
374       LOG.info("Successfully Cached Stats");
375       ourcache.successfullyCachedStats.logStats();
376     }
377 
378   }
379 
380   /**
381    * Just like CacheStats, but more Slab specific. Finely grained profiling of
382    * sizes we store using logs.
383    *
384    */
385   static class SlabStats {
386     // the maximum size somebody will ever try to cache, then we multiply by
387     // 10
388     // so we have finer grained stats.
389     static final int MULTIPLIER = 10;
390     final int NUMDIVISIONS = (int) (Math.log(Integer.MAX_VALUE) * MULTIPLIER);
391     private final AtomicLong[] counts = new AtomicLong[NUMDIVISIONS];
392 
393     public SlabStats() {
394       for (int i = 0; i < NUMDIVISIONS; i++) {
395         counts[i] = new AtomicLong();
396       }
397     }
398 
399     public void addin(int size) {
400       int index = (int) (Math.log(size) * MULTIPLIER);
401       counts[index].incrementAndGet();
402     }
403 
404     public AtomicLong[] getUsage() {
405       return counts;
406     }
407 
408     double getUpperBound(int index) {
409       return Math.pow(Math.E, ((index + 0.5) / MULTIPLIER));
410     }
411 
412     double getLowerBound(int index) {
413       return Math.pow(Math.E, ((index - 0.5) / MULTIPLIER));
414     }
415 
416     public void logStats() {
417       AtomicLong[] fineGrainedStats = getUsage();
418       for (int i = 0; i < fineGrainedStats.length; i++) {
419 
420         if (fineGrainedStats[i].get() > 0) {
421           SlabCache.LOG.info("From  "
422               + StringUtils.humanReadableInt((long) getLowerBound(i)) + "- "
423               + StringUtils.humanReadableInt((long) getUpperBound(i)) + ": "
424               + StringUtils.humanReadableInt(fineGrainedStats[i].get())
425               + " requests");
426 
427         }
428       }
429     }
430   }
431 
432   public int evictBlocksByHfileName(String hfileName) {
433     int numEvicted = 0;
434     for (BlockCacheKey key : backingStore.keySet()) {
435       if (key.getHfileName().equals(hfileName)) {
436         if (evictBlock(key))
437           ++numEvicted;
438       }
439     }
440     return numEvicted;
441   }
442 
443   @Override
444   public Iterator<CachedBlock> iterator() {
445     // Don't bother with ramcache since stuff is in here only a little while.
446     final Iterator<Map.Entry<BlockCacheKey, SingleSizeCache>> i =
447         this.backingStore.entrySet().iterator();
448     return new Iterator<CachedBlock>() {
449       private final long now = System.nanoTime();
450 
451       @Override
452       public boolean hasNext() {
453         return i.hasNext();
454       }
455 
456       @Override
457       public CachedBlock next() {
458         final Map.Entry<BlockCacheKey, SingleSizeCache> e = i.next();
459         final Cacheable cacheable = e.getValue().getBlock(e.getKey(), false, false, false);
460         return new CachedBlock() {
461           @Override
462           public String toString() {
463             return BlockCacheUtil.toString(this, now);
464           }
465 
466           @Override
467           public BlockPriority getBlockPriority() {
468             return null;
469           }
470 
471           @Override
472           public BlockType getBlockType() {
473             return cacheable.getBlockType();
474           }
475 
476           @Override
477           public long getOffset() {
478             return e.getKey().getOffset();
479           }
480 
481           @Override
482           public long getSize() {
483             return cacheable == null? 0: cacheable.getSerializedLength();
484           }
485 
486           @Override
487           public long getCachedTime() {
488             return -1;
489           }
490 
491           @Override
492           public String getFilename() {
493             return e.getKey().getHfileName();
494           }
495 
496           @Override
497           public int compareTo(CachedBlock other) {
498             return (int)(this.getOffset() - other.getOffset());
499           }
500         };
501       }
502 
503       @Override
504       public void remove() {
505         throw new UnsupportedOperationException();
506       }
507     };
508   }
509 
510   @Override
511   public BlockCache[] getBlockCaches() {
512     return null;
513   }
514 }