View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.io.hfile.bucket;
22  
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.concurrent.atomic.AtomicLong;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.classification.InterfaceAudience;
31  import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
32  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
33  
34  /**
35   * This class is used to allocate a block with specified size and free the block
36   * when evicting. It manages an array of buckets, each bucket is associated with
37   * a size and caches elements up to this size. For completely empty bucket, this
38   * size could be re-specified dynamically.
39   * 
40   * This class is not thread safe.
41   */
42  @InterfaceAudience.Private
43  public final class BucketAllocator {
44    static final Log LOG = LogFactory.getLog(BucketAllocator.class);
45  
46    final private static class Bucket {
47      private long baseOffset;
48      private int itemAllocationSize, sizeIndex;
49      private int itemCount;
50      private int freeList[];
51      private int freeCount, usedCount;
52  
53      public Bucket(long offset) {
54        baseOffset = offset;
55        sizeIndex = -1;
56      }
57  
58      void reconfigure(int sizeIndex) {
59        this.sizeIndex = sizeIndex;
60        assert sizeIndex < BUCKET_SIZES.length;
61        itemAllocationSize = BUCKET_SIZES[sizeIndex];
62        itemCount = (int) (((long) BUCKET_CAPACITY) / (long) itemAllocationSize);
63        freeCount = itemCount;
64        usedCount = 0;
65        freeList = new int[itemCount];
66        for (int i = 0; i < freeCount; ++i)
67          freeList[i] = i;
68      }
69  
70      public boolean isUninstantiated() {
71        return sizeIndex == -1;
72      }
73  
74      public int sizeIndex() {
75        return sizeIndex;
76      }
77  
78      public int itemAllocationSize() {
79        return itemAllocationSize;
80      }
81  
82      public boolean hasFreeSpace() {
83        return freeCount > 0;
84      }
85  
86      public boolean isCompletelyFree() {
87        return usedCount == 0;
88      }
89  
90      public int freeCount() {
91        return freeCount;
92      }
93  
94      public int usedCount() {
95        return usedCount;
96      }
97  
98      public int freeBytes() {
99        return freeCount * itemAllocationSize;
100     }
101 
102     public int usedBytes() {
103       return usedCount * itemAllocationSize;
104     }
105 
106     public long baseOffset() {
107       return baseOffset;
108     }
109 
110     /**
111      * Allocate a block in this bucket, return the offset representing the
112      * position in physical space
113      * @return the offset in the IOEngine
114      */
115     public long allocate() {
116       assert freeCount > 0; // Else should not have been called
117       assert sizeIndex != -1;
118       ++usedCount;
119       long offset = baseOffset + (freeList[--freeCount] * itemAllocationSize);
120       assert offset >= 0;
121       return offset;
122     }
123 
124     public void addAllocation(long offset) throws BucketAllocatorException {
125       offset -= baseOffset;
126       if (offset < 0 || offset % itemAllocationSize != 0)
127         throw new BucketAllocatorException(
128             "Attempt to add allocation for bad offset: " + offset + " base="
129                 + baseOffset + ", bucket size=" + itemAllocationSize);
130       int idx = (int) (offset / itemAllocationSize);
131       boolean matchFound = false;
132       for (int i = 0; i < freeCount; ++i) {
133         if (matchFound) freeList[i - 1] = freeList[i];
134         else if (freeList[i] == idx) matchFound = true;
135       }
136       if (!matchFound)
137         throw new BucketAllocatorException("Couldn't find match for index "
138             + idx + " in free list");
139       ++usedCount;
140       --freeCount;
141     }
142 
143     private void free(long offset) {
144       offset -= baseOffset;
145       assert offset >= 0;
146       assert offset < itemCount * itemAllocationSize;
147       assert offset % itemAllocationSize == 0;
148       assert usedCount > 0;
149       assert freeCount < itemCount; // Else duplicate free
150       int item = (int) (offset / (long) itemAllocationSize);
151       assert !freeListContains(item);
152       --usedCount;
153       freeList[freeCount++] = item;
154     }
155 
156     private boolean freeListContains(int blockNo) {
157       for (int i = 0; i < freeCount; ++i) {
158         if (freeList[i] == blockNo) return true;
159       }
160       return false;
161     }
162   }
163 
164   final class BucketSizeInfo {
165     // Free bucket means it has space to allocate a block;
166     // Completely free bucket means it has no block.
167     private List<Bucket> bucketList, freeBuckets, completelyFreeBuckets;
168     private int sizeIndex;
169 
170     BucketSizeInfo(int sizeIndex) {
171       bucketList = new ArrayList<Bucket>();
172       freeBuckets = new ArrayList<Bucket>();
173       completelyFreeBuckets = new ArrayList<Bucket>();
174       this.sizeIndex = sizeIndex;
175     }
176 
177     public void instantiateBucket(Bucket b) {
178       assert b.isUninstantiated() || b.isCompletelyFree();
179       b.reconfigure(sizeIndex);
180       bucketList.add(b);
181       freeBuckets.add(b);
182       completelyFreeBuckets.add(b);
183     }
184 
185     public int sizeIndex() {
186       return sizeIndex;
187     }
188 
189     /**
190      * Find a bucket to allocate a block
191      * @return the offset in the IOEngine
192      */
193     public long allocateBlock() {
194       Bucket b = null;
195       if (freeBuckets.size() > 0) // Use up an existing one first...
196         b = freeBuckets.get(freeBuckets.size() - 1);
197       if (b == null) {
198         b = grabGlobalCompletelyFreeBucket();
199         if (b != null) instantiateBucket(b);
200       }
201       if (b == null) return -1;
202       long result = b.allocate();
203       blockAllocated(b);
204       return result;
205     }
206 
207     void blockAllocated(Bucket b) {
208       if (!b.isCompletelyFree()) completelyFreeBuckets.remove(b);
209       if (!b.hasFreeSpace()) freeBuckets.remove(b);
210     }
211 
212     public Bucket findAndRemoveCompletelyFreeBucket() {
213       Bucket b = null;
214       assert bucketList.size() > 0;
215       if (bucketList.size() == 1) {
216         // So we never get complete starvation of a bucket for a size
217         return null;
218       }
219 
220       if (completelyFreeBuckets.size() > 0) {
221         b = completelyFreeBuckets.get(0);
222         removeBucket(b);
223       }
224       return b;
225     }
226 
227     private void removeBucket(Bucket b) {
228       assert b.isCompletelyFree();
229       bucketList.remove(b);
230       freeBuckets.remove(b);
231       completelyFreeBuckets.remove(b);
232     }
233 
234     public void freeBlock(Bucket b, long offset) {
235       assert bucketList.contains(b);
236       // else we shouldn't have anything to free...
237       assert (!completelyFreeBuckets.contains(b));
238       b.free(offset);
239       if (!freeBuckets.contains(b)) freeBuckets.add(b);
240       if (b.isCompletelyFree()) completelyFreeBuckets.add(b);
241     }
242 
243     public IndexStatistics statistics() {
244       long free = 0, used = 0;
245       for (Bucket b : bucketList) {
246         free += b.freeCount();
247         used += b.usedCount();
248       }
249       return new IndexStatistics(free, used, BUCKET_SIZES[sizeIndex]);
250     }
251   }
252 
253   // Default block size is 64K, so we choose more sizes near 64K, you'd better
254   // reset it according to your cluster's block size distribution
255   // TODO Make these sizes configurable
256   // TODO Support the view of block size distribution statistics
257   private static final int BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024,
258       16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024,
259       56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024,
260       192 * 1024 + 1024, 256 * 1024 + 1024, 384 * 1024 + 1024,
261       512 * 1024 + 1024 };
262 
263   /**
264    * Round up the given block size to bucket size, and get the corresponding
265    * BucketSizeInfo
266    * @param blockSize
267    * @return BucketSizeInfo
268    */
269   public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) {
270     for (int i = 0; i < BUCKET_SIZES.length; ++i)
271       if (blockSize <= BUCKET_SIZES[i])
272         return bucketSizeInfos[i];
273     return null;
274   }
275 
276   static final int BIG_ITEM_SIZE = (512 * 1024) + 1024; // 513K plus overhead
277   static public final int FEWEST_ITEMS_IN_BUCKET = 4;
278   // The capacity size for each bucket
279   static final long BUCKET_CAPACITY = FEWEST_ITEMS_IN_BUCKET * BIG_ITEM_SIZE;
280 
281   private Bucket[] buckets;
282   private BucketSizeInfo[] bucketSizeInfos;
283   private final long totalSize;
284   private long usedSize = 0;
285 
286   BucketAllocator(long availableSpace) throws BucketAllocatorException {
287     buckets = new Bucket[(int) (availableSpace / (long) BUCKET_CAPACITY)];
288     if (buckets.length < BUCKET_SIZES.length)
289       throw new BucketAllocatorException(
290           "Bucket allocator size too small - must have room for at least "
291               + BUCKET_SIZES.length + " buckets");
292     bucketSizeInfos = new BucketSizeInfo[BUCKET_SIZES.length];
293     for (int i = 0; i < BUCKET_SIZES.length; ++i) {
294       bucketSizeInfos[i] = new BucketSizeInfo(i);
295     }
296     for (int i = 0; i < buckets.length; ++i) {
297       buckets[i] = new Bucket(BUCKET_CAPACITY * i);
298       bucketSizeInfos[i < BUCKET_SIZES.length ? i : BUCKET_SIZES.length - 1]
299           .instantiateBucket(buckets[i]);
300     }
301     this.totalSize = ((long) buckets.length) * BUCKET_CAPACITY;
302   }
303 
304   /**
305    * Rebuild the allocator's data structures from a persisted map.
306    * @param availableSpace capacity of cache
307    * @param map A map stores the block key and BucketEntry(block's meta data
308    *          like offset, length)
309    * @param realCacheSize cached data size statistics for bucket cache
310    * @throws BucketAllocatorException
311    */
312   BucketAllocator(long availableSpace, Map<BlockCacheKey, BucketEntry> map,
313       AtomicLong realCacheSize) throws BucketAllocatorException {
314     this(availableSpace);
315 
316     // each bucket has an offset, sizeindex. probably the buckets are too big
317     // in our default state. so what we do is reconfigure them according to what
318     // we've found. we can only reconfigure each bucket once; if more than once,
319     // we know there's a bug, so we just log the info, throw, and start again...
320     boolean[] reconfigured = new boolean[buckets.length];
321     for (Map.Entry<BlockCacheKey, BucketEntry> entry : map.entrySet()) {
322       long foundOffset = entry.getValue().offset();
323       int foundLen = entry.getValue().getLength();
324       int bucketSizeIndex = -1;
325       for (int i = 0; i < BUCKET_SIZES.length; ++i) {
326         if (foundLen <= BUCKET_SIZES[i]) {
327           bucketSizeIndex = i;
328           break;
329         }
330       }
331       if (bucketSizeIndex == -1) {
332         throw new BucketAllocatorException(
333             "Can't match bucket size for the block with size " + foundLen);
334       }
335       int bucketNo = (int) (foundOffset / (long) BUCKET_CAPACITY);
336       if (bucketNo < 0 || bucketNo >= buckets.length)
337         throw new BucketAllocatorException("Can't find bucket " + bucketNo
338             + ", total buckets=" + buckets.length
339             + "; did you shrink the cache?");
340       Bucket b = buckets[bucketNo];
341       if (reconfigured[bucketNo] == true) {
342         if (b.sizeIndex() != bucketSizeIndex)
343           throw new BucketAllocatorException(
344               "Inconsistent allocation in bucket map;");
345       } else {
346         if (!b.isCompletelyFree())
347           throw new BucketAllocatorException("Reconfiguring bucket "
348               + bucketNo + " but it's already allocated; corrupt data");
349         // Need to remove the bucket from whichever list it's currently in at
350         // the moment...
351         BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex];
352         BucketSizeInfo oldbsi = bucketSizeInfos[b.sizeIndex()];
353         oldbsi.removeBucket(b);
354         bsi.instantiateBucket(b);
355         reconfigured[bucketNo] = true;
356       }
357       realCacheSize.addAndGet(foundLen);
358       buckets[bucketNo].addAllocation(foundOffset);
359       usedSize += buckets[bucketNo].itemAllocationSize();
360       bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
361     }
362   }
363 
364   public String getInfo() {
365     StringBuilder sb = new StringBuilder(1024);
366     for (int i = 0; i < buckets.length; ++i) {
367       Bucket b = buckets[i];
368       sb.append("    Bucket ").append(i).append(": ").append(b.itemAllocationSize());
369       sb.append(" freeCount=").append(b.freeCount()).append(" used=")
370           .append(b.usedCount());
371       sb.append('\n');
372     }
373     return sb.toString();
374   }
375 
376   public long getUsedSize() {
377     return this.usedSize;
378   }
379 
380   public long getFreeSize() {
381     long freeSize = this.totalSize - getUsedSize();
382     return freeSize;
383   }
384 
385   public long getTotalSize() {
386     return this.totalSize;
387   }
388 
389   /**
390    * Allocate a block with specified size. Return the offset
391    * @param blockSize size of block
392    * @throws BucketAllocatorException,CacheFullException
393    * @return the offset in the IOEngine
394    */
395   public synchronized long allocateBlock(int blockSize) throws CacheFullException,
396       BucketAllocatorException {
397     assert blockSize > 0;
398     BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize);
399     if (bsi == null) {
400       throw new BucketAllocatorException("Allocation too big size=" + blockSize);
401     }
402     long offset = bsi.allocateBlock();
403 
404     // Ask caller to free up space and try again!
405     if (offset < 0)
406       throw new CacheFullException(blockSize, bsi.sizeIndex());
407     usedSize += BUCKET_SIZES[bsi.sizeIndex()];
408     return offset;
409   }
410 
411   private Bucket grabGlobalCompletelyFreeBucket() {
412     for (BucketSizeInfo bsi : bucketSizeInfos) {
413       Bucket b = bsi.findAndRemoveCompletelyFreeBucket();
414       if (b != null) return b;
415     }
416     return null;
417   }
418 
419   /**
420    * Free a block with the offset
421    * @param offset block's offset
422    * @return size freed
423    */
424   public synchronized int freeBlock(long offset) {
425     int bucketNo = (int) (offset / (long) BUCKET_CAPACITY);
426     assert bucketNo >= 0 && bucketNo < buckets.length;
427     Bucket targetBucket = buckets[bucketNo];
428     bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset);
429     usedSize -= targetBucket.itemAllocationSize();
430     return targetBucket.itemAllocationSize();
431   }
432 
433   public int sizeIndexOfAllocation(long offset) {
434     int bucketNo = (int) (offset / (long) BUCKET_CAPACITY);
435     assert bucketNo >= 0 && bucketNo < buckets.length;
436     Bucket targetBucket = buckets[bucketNo];
437     return targetBucket.sizeIndex();
438   }
439 
440   public int sizeOfAllocation(long offset) {
441     int bucketNo = (int) (offset / (long) BUCKET_CAPACITY);
442     assert bucketNo >= 0 && bucketNo < buckets.length;
443     Bucket targetBucket = buckets[bucketNo];
444     return targetBucket.itemAllocationSize();
445   }
446 
447   static public int getMaximumAllocationIndex() {
448     return BUCKET_SIZES.length;
449   }
450 
451   static class IndexStatistics {
452     private long freeCount, usedCount, itemSize, totalCount;
453 
454     public long freeCount() {
455       return freeCount;
456     }
457 
458     public long usedCount() {
459       return usedCount;
460     }
461 
462     public long totalCount() {
463       return totalCount;
464     }
465 
466     public long freeBytes() {
467       return freeCount * itemSize;
468     }
469 
470     public long usedBytes() {
471       return usedCount * itemSize;
472     }
473 
474     public long totalBytes() {
475       return totalCount * itemSize;
476     }
477 
478     public long itemSize() {
479       return itemSize;
480     }
481 
482     public IndexStatistics(long free, long used, long itemSize) {
483       setTo(free, used, itemSize);
484     }
485 
486     public IndexStatistics() {
487       setTo(-1, -1, 0);
488     }
489 
490     public void setTo(long free, long used, long itemSize) {
491       this.itemSize = itemSize;
492       this.freeCount = free;
493       this.usedCount = used;
494       this.totalCount = free + used;
495     }
496   }
497 
498   public void dumpToLog() {
499     logStatistics();
500     StringBuilder sb = new StringBuilder();
501     for (Bucket b : buckets) {
502       sb.append("Bucket:").append(b.baseOffset).append('\n');
503       sb.append("  Size index: " + b.sizeIndex() + "; Free:" + b.freeCount
504           + "; used:" + b.usedCount + "; freelist\n");
505       for (int i = 0; i < b.freeCount(); ++i)
506         sb.append(b.freeList[i]).append(',');
507       sb.append('\n');
508     }
509     LOG.info(sb);
510   }
511 
512   public void logStatistics() {
513     IndexStatistics total = new IndexStatistics();
514     IndexStatistics[] stats = getIndexStatistics(total);
515     LOG.info("Bucket allocator statistics follow:\n");
516     LOG.info("  Free bytes=" + total.freeBytes() + "+; used bytes="
517         + total.usedBytes() + "; total bytes=" + total.totalBytes());
518     for (IndexStatistics s : stats) {
519       LOG.info("  Object size " + s.itemSize() + " used=" + s.usedCount()
520           + "; free=" + s.freeCount() + "; total=" + s.totalCount());
521     }
522   }
523 
524   public IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) {
525     IndexStatistics[] stats = getIndexStatistics();
526     long totalfree = 0, totalused = 0;
527     for (IndexStatistics stat : stats) {
528       totalfree += stat.freeBytes();
529       totalused += stat.usedBytes();
530     }
531     grandTotal.setTo(totalfree, totalused, 1);
532     return stats;
533   }
534 
535   public IndexStatistics[] getIndexStatistics() {
536     IndexStatistics[] stats = new IndexStatistics[BUCKET_SIZES.length];
537     for (int i = 0; i < stats.length; ++i)
538       stats[i] = bucketSizeInfos[i].statistics();
539     return stats;
540   }
541 
542   public long freeBlock(long freeList[]) {
543     long sz = 0;
544     for (int i = 0; i < freeList.length; ++i)
545       sz += freeBlock(freeList[i]);
546     return sz;
547   }
548 
549 }