View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.io.hfile.bucket;
22  
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.concurrent.atomic.AtomicLong;
27  
28  import com.google.common.base.Objects;
29  import com.google.common.base.Preconditions;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.classification.InterfaceAudience;
34  import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
35  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
36  import org.codehaus.jackson.annotate.JsonIgnoreProperties;
37  
38  /**
39   * This class is used to allocate a block with specified size and free the block
40   * when evicting. It manages an array of buckets, each bucket is associated with
41   * a size and caches elements up to this size. For a completely empty bucket, this
42   * size could be re-specified dynamically.
43   * 
44   * This class is not thread safe.
45   */
46  @InterfaceAudience.Private
47  @JsonIgnoreProperties({"indexStatistics", "freeSize", "usedSize"})
48  public final class BucketAllocator {
49    static final Log LOG = LogFactory.getLog(BucketAllocator.class);
50  
51    @JsonIgnoreProperties({"completelyFree", "uninstantiated"})
52    public final static class Bucket {
53      private long baseOffset;
54      private int itemAllocationSize, sizeIndex;
55      private int itemCount;
56      private int freeList[];
57      private int freeCount, usedCount;
58  
59      public Bucket(long offset) {
60        baseOffset = offset;
61        sizeIndex = -1;
62      }
63  
64      void reconfigure(int sizeIndex, int[] bucketSizes, long bucketCapacity) {
65        Preconditions.checkElementIndex(sizeIndex, bucketSizes.length);
66        this.sizeIndex = sizeIndex;
67        itemAllocationSize = bucketSizes[sizeIndex];
68        itemCount = (int) (bucketCapacity / (long) itemAllocationSize);
69        freeCount = itemCount;
70        usedCount = 0;
71        freeList = new int[itemCount];
72        for (int i = 0; i < freeCount; ++i)
73          freeList[i] = i;
74      }
75  
76      public boolean isUninstantiated() {
77        return sizeIndex == -1;
78      }
79  
80      public int sizeIndex() {
81        return sizeIndex;
82      }
83  
84      public int getItemAllocationSize() {
85        return itemAllocationSize;
86      }
87  
88      public boolean hasFreeSpace() {
89        return freeCount > 0;
90      }
91  
92      public boolean isCompletelyFree() {
93        return usedCount == 0;
94      }
95  
96      public int freeCount() {
97        return freeCount;
98      }
99  
100     public int usedCount() {
101       return usedCount;
102     }
103 
104     public int getFreeBytes() {
105       return freeCount * itemAllocationSize;
106     }
107 
108     public int getUsedBytes() {
109       return usedCount * itemAllocationSize;
110     }
111 
112     public long getBaseOffset() {
113       return baseOffset;
114     }
115 
116     /**
117      * Allocate a block in this bucket, return the offset representing the
118      * position in physical space
119      * @return the offset in the IOEngine
120      */
121     public long allocate() {
122       assert freeCount > 0; // Else should not have been called
123       assert sizeIndex != -1;
124       ++usedCount;
125       long offset = baseOffset + (freeList[--freeCount] * itemAllocationSize);
126       assert offset >= 0;
127       return offset;
128     }
129 
130     public void addAllocation(long offset) throws BucketAllocatorException {
131       offset -= baseOffset;
132       if (offset < 0 || offset % itemAllocationSize != 0)
133         throw new BucketAllocatorException(
134             "Attempt to add allocation for bad offset: " + offset + " base="
135                 + baseOffset + ", bucket size=" + itemAllocationSize);
136       int idx = (int) (offset / itemAllocationSize);
137       boolean matchFound = false;
138       for (int i = 0; i < freeCount; ++i) {
139         if (matchFound) freeList[i - 1] = freeList[i];
140         else if (freeList[i] == idx) matchFound = true;
141       }
142       if (!matchFound)
143         throw new BucketAllocatorException("Couldn't find match for index "
144             + idx + " in free list");
145       ++usedCount;
146       --freeCount;
147     }
148 
149     private void free(long offset) {
150       offset -= baseOffset;
151       assert offset >= 0;
152       assert offset < itemCount * itemAllocationSize;
153       assert offset % itemAllocationSize == 0;
154       assert usedCount > 0;
155       assert freeCount < itemCount; // Else duplicate free
156       int item = (int) (offset / (long) itemAllocationSize);
157       assert !freeListContains(item);
158       --usedCount;
159       freeList[freeCount++] = item;
160     }
161 
162     private boolean freeListContains(int blockNo) {
163       for (int i = 0; i < freeCount; ++i) {
164         if (freeList[i] == blockNo) return true;
165       }
166       return false;
167     }
168   }
169 
170   final class BucketSizeInfo {
171     // Free bucket means it has space to allocate a block;
172     // Completely free bucket means it has no block.
173     private List<Bucket> bucketList, freeBuckets, completelyFreeBuckets;
174     private int sizeIndex;
175 
176     BucketSizeInfo(int sizeIndex) {
177       bucketList = new ArrayList<Bucket>();
178       freeBuckets = new ArrayList<Bucket>();
179       completelyFreeBuckets = new ArrayList<Bucket>();
180       this.sizeIndex = sizeIndex;
181     }
182 
183     public void instantiateBucket(Bucket b) {
184       assert b.isUninstantiated() || b.isCompletelyFree();
185       b.reconfigure(sizeIndex, bucketSizes, bucketCapacity);
186       bucketList.add(b);
187       freeBuckets.add(b);
188       completelyFreeBuckets.add(b);
189     }
190 
191     public int sizeIndex() {
192       return sizeIndex;
193     }
194 
195     /**
196      * Find a bucket to allocate a block
197      * @return the offset in the IOEngine
198      */
199     public long allocateBlock() {
200       Bucket b = null;
201       if (freeBuckets.size() > 0) // Use up an existing one first...
202         b = freeBuckets.get(freeBuckets.size() - 1);
203       if (b == null) {
204         b = grabGlobalCompletelyFreeBucket();
205         if (b != null) instantiateBucket(b);
206       }
207       if (b == null) return -1;
208       long result = b.allocate();
209       blockAllocated(b);
210       return result;
211     }
212 
213     void blockAllocated(Bucket b) {
214       if (!b.isCompletelyFree()) completelyFreeBuckets.remove(b);
215       if (!b.hasFreeSpace()) freeBuckets.remove(b);
216     }
217 
218     public Bucket findAndRemoveCompletelyFreeBucket() {
219       Bucket b = null;
220       assert bucketList.size() > 0;
221       if (bucketList.size() == 1) {
222         // So we never get complete starvation of a bucket for a size
223         return null;
224       }
225 
226       if (completelyFreeBuckets.size() > 0) {
227         b = completelyFreeBuckets.get(0);
228         removeBucket(b);
229       }
230       return b;
231     }
232 
233     private void removeBucket(Bucket b) {
234       assert b.isCompletelyFree();
235       bucketList.remove(b);
236       freeBuckets.remove(b);
237       completelyFreeBuckets.remove(b);
238     }
239 
240     public void freeBlock(Bucket b, long offset) {
241       assert bucketList.contains(b);
242       // else we shouldn't have anything to free...
243       assert (!completelyFreeBuckets.contains(b));
244       b.free(offset);
245       if (!freeBuckets.contains(b)) freeBuckets.add(b);
246       if (b.isCompletelyFree()) completelyFreeBuckets.add(b);
247     }
248 
249     public IndexStatistics statistics() {
250       long free = 0, used = 0;
251       for (Bucket b : bucketList) {
252         free += b.freeCount();
253         used += b.usedCount();
254       }
255       return new IndexStatistics(free, used, bucketSizes[sizeIndex]);
256     }
257 
258     @Override
259     public String toString() {
260       return Objects.toStringHelper(this.getClass())
261         .add("sizeIndex", sizeIndex)
262         .add("bucketSize", bucketSizes[sizeIndex])
263         .toString();
264     }
265   }
266 
267   // Default block size is 64K, so we choose more sizes near 64K, you'd better
268   // reset it according to your cluster's block size distribution
269   // TODO Support the view of block size distribution statistics
270   private static final int DEFAULT_BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024,
271       16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024,
272       56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024,
273       192 * 1024 + 1024, 256 * 1024 + 1024, 384 * 1024 + 1024,
274       512 * 1024 + 1024 };
275 
276   /**
277    * Round up the given block size to bucket size, and get the corresponding
278    * BucketSizeInfo
279    */
280   public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) {
281     for (int i = 0; i < bucketSizes.length; ++i)
282       if (blockSize <= bucketSizes[i])
283         return bucketSizeInfos[i];
284     return null;
285   }
286 
287   static public final int FEWEST_ITEMS_IN_BUCKET = 4;
288 
289   private final int[] bucketSizes;
290   private final int bigItemSize;
291   // The capacity size for each bucket
292   private final long bucketCapacity;
293   private Bucket[] buckets;
294   private BucketSizeInfo[] bucketSizeInfos;
295   private final long totalSize;
296   private long usedSize = 0;
297 
298   BucketAllocator(long availableSpace, int[] bucketSizes)
299       throws BucketAllocatorException {
300     this.bucketSizes = bucketSizes == null ? DEFAULT_BUCKET_SIZES : bucketSizes;
301     int largestBucket = this.bucketSizes[0];
302     for (int i : this.bucketSizes) {
303       largestBucket = Math.max(largestBucket, i);
304     }
305     this.bigItemSize = largestBucket;
306     this.bucketCapacity = FEWEST_ITEMS_IN_BUCKET * bigItemSize;
307     buckets = new Bucket[(int) (availableSpace / bucketCapacity)];
308     if (buckets.length < this.bucketSizes.length)
309       throw new BucketAllocatorException(
310           "Bucket allocator size too small - must have room for at least "
311               + this.bucketSizes.length + " buckets");
312     bucketSizeInfos = new BucketSizeInfo[this.bucketSizes.length];
313     for (int i = 0; i < this.bucketSizes.length; ++i) {
314       bucketSizeInfos[i] = new BucketSizeInfo(i);
315     }
316     for (int i = 0; i < buckets.length; ++i) {
317       buckets[i] = new Bucket(bucketCapacity * i);
318       bucketSizeInfos[i < this.bucketSizes.length ? i : this.bucketSizes.length - 1]
319           .instantiateBucket(buckets[i]);
320     }
321     this.totalSize = ((long) buckets.length) * bucketCapacity;
322   }
323 
324   /**
325    * Rebuild the allocator's data structures from a persisted map.
326    * @param availableSpace capacity of cache
327    * @param map A map stores the block key and BucketEntry(block's meta data
328    *          like offset, length)
329    * @param realCacheSize cached data size statistics for bucket cache
330    * @throws BucketAllocatorException
331    */
332   BucketAllocator(long availableSpace, int[] bucketSizes, Map<BlockCacheKey, BucketEntry> map,
333       AtomicLong realCacheSize) throws BucketAllocatorException {
334     this(availableSpace, bucketSizes);
335 
336     // each bucket has an offset, sizeindex. probably the buckets are too big
337     // in our default state. so what we do is reconfigure them according to what
338     // we've found. we can only reconfigure each bucket once; if more than once,
339     // we know there's a bug, so we just log the info, throw, and start again...
340     boolean[] reconfigured = new boolean[buckets.length];
341     for (Map.Entry<BlockCacheKey, BucketEntry> entry : map.entrySet()) {
342       long foundOffset = entry.getValue().offset();
343       int foundLen = entry.getValue().getLength();
344       int bucketSizeIndex = -1;
345       for (int i = 0; i < bucketSizes.length; ++i) {
346         if (foundLen <= bucketSizes[i]) {
347           bucketSizeIndex = i;
348           break;
349         }
350       }
351       if (bucketSizeIndex == -1) {
352         throw new BucketAllocatorException(
353             "Can't match bucket size for the block with size " + foundLen);
354       }
355       int bucketNo = (int) (foundOffset / bucketCapacity);
356       if (bucketNo < 0 || bucketNo >= buckets.length)
357         throw new BucketAllocatorException("Can't find bucket " + bucketNo
358             + ", total buckets=" + buckets.length
359             + "; did you shrink the cache?");
360       Bucket b = buckets[bucketNo];
361       if (reconfigured[bucketNo]) {
362         if (b.sizeIndex() != bucketSizeIndex)
363           throw new BucketAllocatorException(
364               "Inconsistent allocation in bucket map;");
365       } else {
366         if (!b.isCompletelyFree())
367           throw new BucketAllocatorException("Reconfiguring bucket "
368               + bucketNo + " but it's already allocated; corrupt data");
369         // Need to remove the bucket from whichever list it's currently in at
370         // the moment...
371         BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex];
372         BucketSizeInfo oldbsi = bucketSizeInfos[b.sizeIndex()];
373         oldbsi.removeBucket(b);
374         bsi.instantiateBucket(b);
375         reconfigured[bucketNo] = true;
376       }
377       realCacheSize.addAndGet(foundLen);
378       buckets[bucketNo].addAllocation(foundOffset);
379       usedSize += buckets[bucketNo].getItemAllocationSize();
380       bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
381     }
382   }
383 
384   public String toString() {
385     StringBuilder sb = new StringBuilder(1024);
386     for (int i = 0; i < buckets.length; ++i) {
387       Bucket b = buckets[i];
388       if (i > 0) sb.append(", ");
389       sb.append("bucket.").append(i).append(": size=").append(b.getItemAllocationSize());
390       sb.append(", freeCount=").append(b.freeCount()).append(", used=").append(b.usedCount());
391     }
392     return sb.toString();
393   }
394 
395   public long getUsedSize() {
396     return this.usedSize;
397   }
398 
399   public long getFreeSize() {
400     return this.totalSize - getUsedSize();
401   }
402 
403   public long getTotalSize() {
404     return this.totalSize;
405   }
406 
407   /**
408    * Allocate a block with specified size. Return the offset
409    * @param blockSize size of block
410    * @throws BucketAllocatorException,CacheFullException
411    * @return the offset in the IOEngine
412    */
413   public synchronized long allocateBlock(int blockSize) throws CacheFullException,
414       BucketAllocatorException {
415     assert blockSize > 0;
416     BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize);
417     if (bsi == null) {
418       throw new BucketAllocatorException("Allocation too big size=" + blockSize);
419     }
420     long offset = bsi.allocateBlock();
421 
422     // Ask caller to free up space and try again!
423     if (offset < 0)
424       throw new CacheFullException(blockSize, bsi.sizeIndex());
425     usedSize += bucketSizes[bsi.sizeIndex()];
426     return offset;
427   }
428 
429   private Bucket grabGlobalCompletelyFreeBucket() {
430     for (BucketSizeInfo bsi : bucketSizeInfos) {
431       Bucket b = bsi.findAndRemoveCompletelyFreeBucket();
432       if (b != null) return b;
433     }
434     return null;
435   }
436 
437   /**
438    * Free a block with the offset
439    * @param offset block's offset
440    * @return size freed
441    */
442   public synchronized int freeBlock(long offset) {
443     int bucketNo = (int) (offset / bucketCapacity);
444     assert bucketNo >= 0 && bucketNo < buckets.length;
445     Bucket targetBucket = buckets[bucketNo];
446     bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset);
447     usedSize -= targetBucket.getItemAllocationSize();
448     return targetBucket.getItemAllocationSize();
449   }
450 
451   public int sizeIndexOfAllocation(long offset) {
452     int bucketNo = (int) (offset / bucketCapacity);
453     assert bucketNo >= 0 && bucketNo < buckets.length;
454     Bucket targetBucket = buckets[bucketNo];
455     return targetBucket.sizeIndex();
456   }
457 
458   public int sizeOfAllocation(long offset) {
459     int bucketNo = (int) (offset / bucketCapacity);
460     assert bucketNo >= 0 && bucketNo < buckets.length;
461     Bucket targetBucket = buckets[bucketNo];
462     return targetBucket.getItemAllocationSize();
463   }
464 
465   static class IndexStatistics {
466     private long freeCount, usedCount, itemSize, totalCount;
467 
468     public long freeCount() {
469       return freeCount;
470     }
471 
472     public long usedCount() {
473       return usedCount;
474     }
475 
476     public long totalCount() {
477       return totalCount;
478     }
479 
480     public long freeBytes() {
481       return freeCount * itemSize;
482     }
483 
484     public long usedBytes() {
485       return usedCount * itemSize;
486     }
487 
488     public long totalBytes() {
489       return totalCount * itemSize;
490     }
491 
492     public long itemSize() {
493       return itemSize;
494     }
495 
496     public IndexStatistics(long free, long used, long itemSize) {
497       setTo(free, used, itemSize);
498     }
499 
500     public IndexStatistics() {
501       setTo(-1, -1, 0);
502     }
503 
504     public void setTo(long free, long used, long itemSize) {
505       this.itemSize = itemSize;
506       this.freeCount = free;
507       this.usedCount = used;
508       this.totalCount = free + used;
509     }
510   }
511 
512   public Bucket [] getBuckets() {
513     return this.buckets;
514   }
515 
516   void logStatistics() {
517     IndexStatistics total = new IndexStatistics();
518     IndexStatistics[] stats = getIndexStatistics(total);
519     LOG.info("Bucket allocator statistics follow:\n");
520     LOG.info("  Free bytes=" + total.freeBytes() + "+; used bytes="
521         + total.usedBytes() + "; total bytes=" + total.totalBytes());
522     for (IndexStatistics s : stats) {
523       LOG.info("  Object size " + s.itemSize() + " used=" + s.usedCount()
524           + "; free=" + s.freeCount() + "; total=" + s.totalCount());
525     }
526   }
527 
528   IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) {
529     IndexStatistics[] stats = getIndexStatistics();
530     long totalfree = 0, totalused = 0;
531     for (IndexStatistics stat : stats) {
532       totalfree += stat.freeBytes();
533       totalused += stat.usedBytes();
534     }
535     grandTotal.setTo(totalfree, totalused, 1);
536     return stats;
537   }
538 
539   IndexStatistics[] getIndexStatistics() {
540     IndexStatistics[] stats = new IndexStatistics[bucketSizes.length];
541     for (int i = 0; i < stats.length; ++i)
542       stats[i] = bucketSizeInfos[i].statistics();
543     return stats;
544   }
545 
546   public long freeBlock(long freeList[]) {
547     long sz = 0;
548     for (int i = 0; i < freeList.length; ++i)
549       sz += freeBlock(freeList[i]);
550     return sz;
551   }
552 
553 }