View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.io.hfile.bucket;
22  
23  import java.util.Arrays;
24  import java.util.Map;
25  import java.util.concurrent.atomic.AtomicLong;
26  
27  import org.apache.commons.collections.map.LinkedMap;
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
32  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
33  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
34  import org.codehaus.jackson.annotate.JsonIgnoreProperties;
35  
36  import com.google.common.base.Objects;
37  import com.google.common.base.Preconditions;
38  import com.google.common.primitives.Ints;
39  
40  /**
41   * This class is used to allocate a block with specified size and free the block
42   * when evicting. It manages an array of buckets, each bucket is associated with
43   * a size and caches elements up to this size. For a completely empty bucket, this
44   * size could be re-specified dynamically.
45   * 
46   * This class is not thread safe.
47   */
48  @InterfaceAudience.Private
49  @JsonIgnoreProperties({"indexStatistics", "freeSize", "usedSize"})
50  public final class BucketAllocator {
51    private static final Log LOG = LogFactory.getLog(BucketAllocator.class);
52  
53    @JsonIgnoreProperties({"completelyFree", "uninstantiated"})
54    public final static class Bucket {
55      private long baseOffset;
56      private int itemAllocationSize, sizeIndex;
57      private int itemCount;
58      private int freeList[];
59      private int freeCount, usedCount;
60  
61      public Bucket(long offset) {
62        baseOffset = offset;
63        sizeIndex = -1;
64      }
65  
66      void reconfigure(int sizeIndex, int[] bucketSizes, long bucketCapacity) {
67        Preconditions.checkElementIndex(sizeIndex, bucketSizes.length);
68        this.sizeIndex = sizeIndex;
69        itemAllocationSize = bucketSizes[sizeIndex];
70        itemCount = (int) (bucketCapacity / (long) itemAllocationSize);
71        freeCount = itemCount;
72        usedCount = 0;
73        freeList = new int[itemCount];
74        for (int i = 0; i < freeCount; ++i)
75          freeList[i] = i;
76      }
77  
78      public boolean isUninstantiated() {
79        return sizeIndex == -1;
80      }
81  
82      public int sizeIndex() {
83        return sizeIndex;
84      }
85  
86      public int getItemAllocationSize() {
87        return itemAllocationSize;
88      }
89  
90      public boolean hasFreeSpace() {
91        return freeCount > 0;
92      }
93  
94      public boolean isCompletelyFree() {
95        return usedCount == 0;
96      }
97  
98      public int freeCount() {
99        return freeCount;
100     }
101 
102     public int usedCount() {
103       return usedCount;
104     }
105 
106     public int getFreeBytes() {
107       return freeCount * itemAllocationSize;
108     }
109 
110     public int getUsedBytes() {
111       return usedCount * itemAllocationSize;
112     }
113 
114     public long getBaseOffset() {
115       return baseOffset;
116     }
117 
118     /**
119      * Allocate a block in this bucket, return the offset representing the
120      * position in physical space
121      * @return the offset in the IOEngine
122      */
123     public long allocate() {
124       assert freeCount > 0; // Else should not have been called
125       assert sizeIndex != -1;
126       ++usedCount;
127       long offset = baseOffset + (freeList[--freeCount] * itemAllocationSize);
128       assert offset >= 0;
129       return offset;
130     }
131 
132     public void addAllocation(long offset) throws BucketAllocatorException {
133       offset -= baseOffset;
134       if (offset < 0 || offset % itemAllocationSize != 0)
135         throw new BucketAllocatorException(
136             "Attempt to add allocation for bad offset: " + offset + " base="
137                 + baseOffset + ", bucket size=" + itemAllocationSize);
138       int idx = (int) (offset / itemAllocationSize);
139       boolean matchFound = false;
140       for (int i = 0; i < freeCount; ++i) {
141         if (matchFound) freeList[i - 1] = freeList[i];
142         else if (freeList[i] == idx) matchFound = true;
143       }
144       if (!matchFound)
145         throw new BucketAllocatorException("Couldn't find match for index "
146             + idx + " in free list");
147       ++usedCount;
148       --freeCount;
149     }
150 
151     private void free(long offset) {
152       offset -= baseOffset;
153       assert offset >= 0;
154       assert offset < itemCount * itemAllocationSize;
155       assert offset % itemAllocationSize == 0;
156       assert usedCount > 0;
157       assert freeCount < itemCount; // Else duplicate free
158       int item = (int) (offset / (long) itemAllocationSize);
159       assert !freeListContains(item);
160       --usedCount;
161       freeList[freeCount++] = item;
162     }
163 
164     private boolean freeListContains(int blockNo) {
165       for (int i = 0; i < freeCount; ++i) {
166         if (freeList[i] == blockNo) return true;
167       }
168       return false;
169     }
170   }
171 
172   final class BucketSizeInfo {
173     // Free bucket means it has space to allocate a block;
174     // Completely free bucket means it has no block.
175     private LinkedMap bucketList, freeBuckets, completelyFreeBuckets;
176     private int sizeIndex;
177 
178     BucketSizeInfo(int sizeIndex) {
179       bucketList = new LinkedMap();
180       freeBuckets = new LinkedMap();
181       completelyFreeBuckets = new LinkedMap();
182       this.sizeIndex = sizeIndex;
183     }
184 
185     public synchronized void instantiateBucket(Bucket b) {
186       assert b.isUninstantiated() || b.isCompletelyFree();
187       b.reconfigure(sizeIndex, bucketSizes, bucketCapacity);
188       bucketList.put(b, b);
189       freeBuckets.put(b, b);
190       completelyFreeBuckets.put(b, b);
191     }
192 
193     public int sizeIndex() {
194       return sizeIndex;
195     }
196 
197     /**
198      * Find a bucket to allocate a block
199      * @return the offset in the IOEngine
200      */
201     public long allocateBlock() {
202       Bucket b = null;
203       if (freeBuckets.size() > 0) {
204         // Use up an existing one first...
205         b = (Bucket) freeBuckets.lastKey();
206       }
207       if (b == null) {
208         b = grabGlobalCompletelyFreeBucket();
209         if (b != null) instantiateBucket(b);
210       }
211       if (b == null) return -1;
212       long result = b.allocate();
213       blockAllocated(b);
214       return result;
215     }
216 
217     void blockAllocated(Bucket b) {
218       if (!b.isCompletelyFree()) completelyFreeBuckets.remove(b);
219       if (!b.hasFreeSpace()) freeBuckets.remove(b);
220     }
221 
222     public Bucket findAndRemoveCompletelyFreeBucket() {
223       Bucket b = null;
224       assert bucketList.size() > 0;
225       if (bucketList.size() == 1) {
226         // So we never get complete starvation of a bucket for a size
227         return null;
228       }
229 
230       if (completelyFreeBuckets.size() > 0) {
231         b = (Bucket) completelyFreeBuckets.firstKey();
232         removeBucket(b);
233       }
234       return b;
235     }
236 
237     private synchronized void removeBucket(Bucket b) {
238       assert b.isCompletelyFree();
239       bucketList.remove(b);
240       freeBuckets.remove(b);
241       completelyFreeBuckets.remove(b);
242     }
243 
244     public void freeBlock(Bucket b, long offset) {
245       assert bucketList.containsKey(b);
246       // else we shouldn't have anything to free...
247       assert (!completelyFreeBuckets.containsKey(b));
248       b.free(offset);
249       if (!freeBuckets.containsKey(b)) freeBuckets.put(b, b);
250       if (b.isCompletelyFree()) completelyFreeBuckets.put(b, b);
251     }
252 
253     public synchronized IndexStatistics statistics() {
254       long free = 0, used = 0;
255       for (Object obj : bucketList.keySet()) {
256         Bucket b = (Bucket) obj;
257         free += b.freeCount();
258         used += b.usedCount();
259       }
260       return new IndexStatistics(free, used, bucketSizes[sizeIndex]);
261     }
262 
263     @Override
264     public String toString() {
265       return Objects.toStringHelper(this.getClass())
266         .add("sizeIndex", sizeIndex)
267         .add("bucketSize", bucketSizes[sizeIndex])
268         .toString();
269     }
270   }
271 
272   // Default block size is 64K, so we choose more sizes near 64K, you'd better
273   // reset it according to your cluster's block size distribution
274   // TODO Support the view of block size distribution statistics
275   private static final int DEFAULT_BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024,
276       16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024,
277       56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024,
278       192 * 1024 + 1024, 256 * 1024 + 1024, 384 * 1024 + 1024,
279       512 * 1024 + 1024 };
280 
281   /**
282    * Round up the given block size to bucket size, and get the corresponding
283    * BucketSizeInfo
284    */
285   public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) {
286     for (int i = 0; i < bucketSizes.length; ++i)
287       if (blockSize <= bucketSizes[i])
288         return bucketSizeInfos[i];
289     return null;
290   }
291 
292   static public final int FEWEST_ITEMS_IN_BUCKET = 4;
293 
294   private final int[] bucketSizes;
295   private final int bigItemSize;
296   // The capacity size for each bucket
297   private final long bucketCapacity;
298   private Bucket[] buckets;
299   private BucketSizeInfo[] bucketSizeInfos;
300   private final long totalSize;
301   private long usedSize = 0;
302 
303   BucketAllocator(long availableSpace, int[] bucketSizes)
304       throws BucketAllocatorException {
305     this.bucketSizes = bucketSizes == null ? DEFAULT_BUCKET_SIZES : bucketSizes;
306     Arrays.sort(this.bucketSizes);
307     this.bigItemSize = Ints.max(this.bucketSizes);
308     this.bucketCapacity = FEWEST_ITEMS_IN_BUCKET * bigItemSize;
309     buckets = new Bucket[(int) (availableSpace / bucketCapacity)];
310     if (buckets.length < this.bucketSizes.length)
311       throw new BucketAllocatorException(
312           "Bucket allocator size too small - must have room for at least "
313               + this.bucketSizes.length + " buckets");
314     bucketSizeInfos = new BucketSizeInfo[this.bucketSizes.length];
315     for (int i = 0; i < this.bucketSizes.length; ++i) {
316       bucketSizeInfos[i] = new BucketSizeInfo(i);
317     }
318     for (int i = 0; i < buckets.length; ++i) {
319       buckets[i] = new Bucket(bucketCapacity * i);
320       bucketSizeInfos[i < this.bucketSizes.length ? i : this.bucketSizes.length - 1]
321           .instantiateBucket(buckets[i]);
322     }
323     this.totalSize = ((long) buckets.length) * bucketCapacity;
324   }
325 
326   /**
327    * Rebuild the allocator's data structures from a persisted map.
328    * @param availableSpace capacity of cache
329    * @param map A map stores the block key and BucketEntry(block's meta data
330    *          like offset, length)
331    * @param realCacheSize cached data size statistics for bucket cache
332    * @throws BucketAllocatorException
333    */
334   BucketAllocator(long availableSpace, int[] bucketSizes, Map<BlockCacheKey, BucketEntry> map,
335       AtomicLong realCacheSize) throws BucketAllocatorException {
336     this(availableSpace, bucketSizes);
337 
338     // each bucket has an offset, sizeindex. probably the buckets are too big
339     // in our default state. so what we do is reconfigure them according to what
340     // we've found. we can only reconfigure each bucket once; if more than once,
341     // we know there's a bug, so we just log the info, throw, and start again...
342     boolean[] reconfigured = new boolean[buckets.length];
343     for (Map.Entry<BlockCacheKey, BucketEntry> entry : map.entrySet()) {
344       long foundOffset = entry.getValue().offset();
345       int foundLen = entry.getValue().getLength();
346       int bucketSizeIndex = -1;
347       for (int i = 0; i < bucketSizes.length; ++i) {
348         if (foundLen <= bucketSizes[i]) {
349           bucketSizeIndex = i;
350           break;
351         }
352       }
353       if (bucketSizeIndex == -1) {
354         throw new BucketAllocatorException(
355             "Can't match bucket size for the block with size " + foundLen);
356       }
357       int bucketNo = (int) (foundOffset / bucketCapacity);
358       if (bucketNo < 0 || bucketNo >= buckets.length)
359         throw new BucketAllocatorException("Can't find bucket " + bucketNo
360             + ", total buckets=" + buckets.length
361             + "; did you shrink the cache?");
362       Bucket b = buckets[bucketNo];
363       if (reconfigured[bucketNo]) {
364         if (b.sizeIndex() != bucketSizeIndex)
365           throw new BucketAllocatorException(
366               "Inconsistent allocation in bucket map;");
367       } else {
368         if (!b.isCompletelyFree())
369           throw new BucketAllocatorException("Reconfiguring bucket "
370               + bucketNo + " but it's already allocated; corrupt data");
371         // Need to remove the bucket from whichever list it's currently in at
372         // the moment...
373         BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex];
374         BucketSizeInfo oldbsi = bucketSizeInfos[b.sizeIndex()];
375         oldbsi.removeBucket(b);
376         bsi.instantiateBucket(b);
377         reconfigured[bucketNo] = true;
378       }
379       realCacheSize.addAndGet(foundLen);
380       buckets[bucketNo].addAllocation(foundOffset);
381       usedSize += buckets[bucketNo].getItemAllocationSize();
382       bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
383     }
384   }
385 
386   public String toString() {
387     StringBuilder sb = new StringBuilder(1024);
388     for (int i = 0; i < buckets.length; ++i) {
389       Bucket b = buckets[i];
390       if (i > 0) sb.append(", ");
391       sb.append("bucket.").append(i).append(": size=").append(b.getItemAllocationSize());
392       sb.append(", freeCount=").append(b.freeCount()).append(", used=").append(b.usedCount());
393     }
394     return sb.toString();
395   }
396 
397   public long getUsedSize() {
398     return this.usedSize;
399   }
400 
401   public long getFreeSize() {
402     return this.totalSize - getUsedSize();
403   }
404 
405   public long getTotalSize() {
406     return this.totalSize;
407   }
408 
409   /**
410    * Allocate a block with specified size. Return the offset
411    * @param blockSize size of block
412    * @throws BucketAllocatorException
413    * @throws CacheFullException
414    * @return the offset in the IOEngine
415    */
416   public synchronized long allocateBlock(int blockSize) throws CacheFullException,
417       BucketAllocatorException {
418     assert blockSize > 0;
419     BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize);
420     if (bsi == null) {
421       throw new BucketAllocatorException("Allocation too big size=" + blockSize +
422         "; adjust BucketCache sizes " + CacheConfig.BUCKET_CACHE_BUCKETS_KEY +
423         " to accomodate if size seems reasonable and you want it cached.");
424     }
425     long offset = bsi.allocateBlock();
426 
427     // Ask caller to free up space and try again!
428     if (offset < 0)
429       throw new CacheFullException(blockSize, bsi.sizeIndex());
430     usedSize += bucketSizes[bsi.sizeIndex()];
431     return offset;
432   }
433 
434   private Bucket grabGlobalCompletelyFreeBucket() {
435     for (BucketSizeInfo bsi : bucketSizeInfos) {
436       Bucket b = bsi.findAndRemoveCompletelyFreeBucket();
437       if (b != null) return b;
438     }
439     return null;
440   }
441 
442   /**
443    * Free a block with the offset
444    * @param offset block's offset
445    * @return size freed
446    */
447   public synchronized int freeBlock(long offset) {
448     int bucketNo = (int) (offset / bucketCapacity);
449     assert bucketNo >= 0 && bucketNo < buckets.length;
450     Bucket targetBucket = buckets[bucketNo];
451     bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset);
452     usedSize -= targetBucket.getItemAllocationSize();
453     return targetBucket.getItemAllocationSize();
454   }
455 
456   public int sizeIndexOfAllocation(long offset) {
457     int bucketNo = (int) (offset / bucketCapacity);
458     assert bucketNo >= 0 && bucketNo < buckets.length;
459     Bucket targetBucket = buckets[bucketNo];
460     return targetBucket.sizeIndex();
461   }
462 
463   public int sizeOfAllocation(long offset) {
464     int bucketNo = (int) (offset / bucketCapacity);
465     assert bucketNo >= 0 && bucketNo < buckets.length;
466     Bucket targetBucket = buckets[bucketNo];
467     return targetBucket.getItemAllocationSize();
468   }
469 
470   static class IndexStatistics {
471     private long freeCount, usedCount, itemSize, totalCount;
472 
473     public long freeCount() {
474       return freeCount;
475     }
476 
477     public long usedCount() {
478       return usedCount;
479     }
480 
481     public long totalCount() {
482       return totalCount;
483     }
484 
485     public long freeBytes() {
486       return freeCount * itemSize;
487     }
488 
489     public long usedBytes() {
490       return usedCount * itemSize;
491     }
492 
493     public long totalBytes() {
494       return totalCount * itemSize;
495     }
496 
497     public long itemSize() {
498       return itemSize;
499     }
500 
501     public IndexStatistics(long free, long used, long itemSize) {
502       setTo(free, used, itemSize);
503     }
504 
505     public IndexStatistics() {
506       setTo(-1, -1, 0);
507     }
508 
509     public void setTo(long free, long used, long itemSize) {
510       this.itemSize = itemSize;
511       this.freeCount = free;
512       this.usedCount = used;
513       this.totalCount = free + used;
514     }
515   }
516 
517   public Bucket [] getBuckets() {
518     return this.buckets;
519   }
520 
521   void logStatistics() {
522     IndexStatistics total = new IndexStatistics();
523     IndexStatistics[] stats = getIndexStatistics(total);
524     LOG.info("Bucket allocator statistics follow:\n");
525     LOG.info("  Free bytes=" + total.freeBytes() + "+; used bytes="
526         + total.usedBytes() + "; total bytes=" + total.totalBytes());
527     for (IndexStatistics s : stats) {
528       LOG.info("  Object size " + s.itemSize() + " used=" + s.usedCount()
529           + "; free=" + s.freeCount() + "; total=" + s.totalCount());
530     }
531   }
532 
533   IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) {
534     IndexStatistics[] stats = getIndexStatistics();
535     long totalfree = 0, totalused = 0;
536     for (IndexStatistics stat : stats) {
537       totalfree += stat.freeBytes();
538       totalused += stat.usedBytes();
539     }
540     grandTotal.setTo(totalfree, totalused, 1);
541     return stats;
542   }
543 
544   IndexStatistics[] getIndexStatistics() {
545     IndexStatistics[] stats = new IndexStatistics[bucketSizes.length];
546     for (int i = 0; i < stats.length; ++i)
547       stats[i] = bucketSizeInfos[i].statistics();
548     return stats;
549   }
550 
551   public long freeBlock(long freeList[]) {
552     long sz = 0;
553     for (int i = 0; i < freeList.length; ++i)
554       sz += freeBlock(freeList[i]);
555     return sz;
556   }
557 
558 }