View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.io.hfile.bucket;
22  
23  import java.util.Arrays;
24  import java.util.LinkedList;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.concurrent.atomic.AtomicLong;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
33  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
34  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
35  import org.codehaus.jackson.annotate.JsonIgnoreProperties;
36  
37  import com.google.common.base.Objects;
38  import com.google.common.base.Preconditions;
39  import com.google.common.primitives.Ints;
40  
41  /**
42   * This class is used to allocate a block with specified size and free the block
43   * when evicting. It manages an array of buckets, each bucket is associated with
44   * a size and caches elements up to this size. For a completely empty bucket, this
45   * size could be re-specified dynamically.
46   * 
47   * This class is not thread safe.
48   */
49  @InterfaceAudience.Private
50  @JsonIgnoreProperties({"indexStatistics", "freeSize", "usedSize"})
51  public final class BucketAllocator {
52    static final Log LOG = LogFactory.getLog(BucketAllocator.class);
53  
54    @JsonIgnoreProperties({"completelyFree", "uninstantiated"})
55    public final static class Bucket {
56      private long baseOffset;
57      private int itemAllocationSize, sizeIndex;
58      private int itemCount;
59      private int freeList[];
60      private int freeCount, usedCount;
61  
62      public Bucket(long offset) {
63        baseOffset = offset;
64        sizeIndex = -1;
65      }
66  
67      void reconfigure(int sizeIndex, int[] bucketSizes, long bucketCapacity) {
68        Preconditions.checkElementIndex(sizeIndex, bucketSizes.length);
69        this.sizeIndex = sizeIndex;
70        itemAllocationSize = bucketSizes[sizeIndex];
71        itemCount = (int) (bucketCapacity / (long) itemAllocationSize);
72        freeCount = itemCount;
73        usedCount = 0;
74        freeList = new int[itemCount];
75        for (int i = 0; i < freeCount; ++i)
76          freeList[i] = i;
77      }
78  
79      public boolean isUninstantiated() {
80        return sizeIndex == -1;
81      }
82  
83      public int sizeIndex() {
84        return sizeIndex;
85      }
86  
87      public int getItemAllocationSize() {
88        return itemAllocationSize;
89      }
90  
91      public boolean hasFreeSpace() {
92        return freeCount > 0;
93      }
94  
95      public boolean isCompletelyFree() {
96        return usedCount == 0;
97      }
98  
99      public int freeCount() {
100       return freeCount;
101     }
102 
103     public int usedCount() {
104       return usedCount;
105     }
106 
107     public int getFreeBytes() {
108       return freeCount * itemAllocationSize;
109     }
110 
111     public int getUsedBytes() {
112       return usedCount * itemAllocationSize;
113     }
114 
115     public long getBaseOffset() {
116       return baseOffset;
117     }
118 
119     /**
120      * Allocate a block in this bucket, return the offset representing the
121      * position in physical space
122      * @return the offset in the IOEngine
123      */
124     public long allocate() {
125       assert freeCount > 0; // Else should not have been called
126       assert sizeIndex != -1;
127       ++usedCount;
128       long offset = baseOffset + (freeList[--freeCount] * itemAllocationSize);
129       assert offset >= 0;
130       return offset;
131     }
132 
133     public void addAllocation(long offset) throws BucketAllocatorException {
134       offset -= baseOffset;
135       if (offset < 0 || offset % itemAllocationSize != 0)
136         throw new BucketAllocatorException(
137             "Attempt to add allocation for bad offset: " + offset + " base="
138                 + baseOffset + ", bucket size=" + itemAllocationSize);
139       int idx = (int) (offset / itemAllocationSize);
140       boolean matchFound = false;
141       for (int i = 0; i < freeCount; ++i) {
142         if (matchFound) freeList[i - 1] = freeList[i];
143         else if (freeList[i] == idx) matchFound = true;
144       }
145       if (!matchFound)
146         throw new BucketAllocatorException("Couldn't find match for index "
147             + idx + " in free list");
148       ++usedCount;
149       --freeCount;
150     }
151 
152     private void free(long offset) {
153       offset -= baseOffset;
154       assert offset >= 0;
155       assert offset < itemCount * itemAllocationSize;
156       assert offset % itemAllocationSize == 0;
157       assert usedCount > 0;
158       assert freeCount < itemCount; // Else duplicate free
159       int item = (int) (offset / (long) itemAllocationSize);
160       assert !freeListContains(item);
161       --usedCount;
162       freeList[freeCount++] = item;
163     }
164 
165     private boolean freeListContains(int blockNo) {
166       for (int i = 0; i < freeCount; ++i) {
167         if (freeList[i] == blockNo) return true;
168       }
169       return false;
170     }
171   }
172 
173   final class BucketSizeInfo {
174     // Free bucket means it has space to allocate a block;
175     // Completely free bucket means it has no block.
176     private List<Bucket> bucketList, freeBuckets, completelyFreeBuckets;
177     private int sizeIndex;
178 
179     BucketSizeInfo(int sizeIndex) {
180       bucketList = new LinkedList<Bucket>();
181       freeBuckets = new LinkedList<Bucket>();
182       completelyFreeBuckets = new LinkedList<Bucket>();
183       this.sizeIndex = sizeIndex;
184     }
185 
186     public synchronized void instantiateBucket(Bucket b) {
187       assert b.isUninstantiated() || b.isCompletelyFree();
188       b.reconfigure(sizeIndex, bucketSizes, bucketCapacity);
189       bucketList.add(b);
190       freeBuckets.add(b);
191       completelyFreeBuckets.add(b);
192     }
193 
194     public int sizeIndex() {
195       return sizeIndex;
196     }
197 
198     /**
199      * Find a bucket to allocate a block
200      * @return the offset in the IOEngine
201      */
202     public long allocateBlock() {
203       Bucket b = null;
204       if (freeBuckets.size() > 0) // Use up an existing one first...
205         b = freeBuckets.get(freeBuckets.size() - 1);
206       if (b == null) {
207         b = grabGlobalCompletelyFreeBucket();
208         if (b != null) instantiateBucket(b);
209       }
210       if (b == null) return -1;
211       long result = b.allocate();
212       blockAllocated(b);
213       return result;
214     }
215 
216     void blockAllocated(Bucket b) {
217       if (!b.isCompletelyFree()) completelyFreeBuckets.remove(b);
218       if (!b.hasFreeSpace()) freeBuckets.remove(b);
219     }
220 
221     public Bucket findAndRemoveCompletelyFreeBucket() {
222       Bucket b = null;
223       assert bucketList.size() > 0;
224       if (bucketList.size() == 1) {
225         // So we never get complete starvation of a bucket for a size
226         return null;
227       }
228 
229       if (completelyFreeBuckets.size() > 0) {
230         b = completelyFreeBuckets.get(0);
231         removeBucket(b);
232       }
233       return b;
234     }
235 
236     private synchronized void removeBucket(Bucket b) {
237       assert b.isCompletelyFree();
238       bucketList.remove(b);
239       freeBuckets.remove(b);
240       completelyFreeBuckets.remove(b);
241     }
242 
243     public void freeBlock(Bucket b, long offset) {
244       assert bucketList.contains(b);
245       // else we shouldn't have anything to free...
246       assert (!completelyFreeBuckets.contains(b));
247       b.free(offset);
248       if (!freeBuckets.contains(b)) freeBuckets.add(b);
249       if (b.isCompletelyFree()) completelyFreeBuckets.add(b);
250     }
251 
252     public synchronized IndexStatistics statistics() {
253       long free = 0, used = 0;
254       for (Bucket b : bucketList) {
255         free += b.freeCount();
256         used += b.usedCount();
257       }
258       return new IndexStatistics(free, used, bucketSizes[sizeIndex]);
259     }
260 
261     @Override
262     public String toString() {
263       return Objects.toStringHelper(this.getClass())
264         .add("sizeIndex", sizeIndex)
265         .add("bucketSize", bucketSizes[sizeIndex])
266         .toString();
267     }
268   }
269 
270   // Default block size is 64K, so we choose more sizes near 64K, you'd better
271   // reset it according to your cluster's block size distribution
272   // TODO Support the view of block size distribution statistics
273   private static final int DEFAULT_BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024,
274       16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024,
275       56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024,
276       192 * 1024 + 1024, 256 * 1024 + 1024, 384 * 1024 + 1024,
277       512 * 1024 + 1024 };
278 
279   /**
280    * Round up the given block size to bucket size, and get the corresponding
281    * BucketSizeInfo
282    */
283   public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) {
284     for (int i = 0; i < bucketSizes.length; ++i)
285       if (blockSize <= bucketSizes[i])
286         return bucketSizeInfos[i];
287     return null;
288   }
289 
290   static public final int FEWEST_ITEMS_IN_BUCKET = 4;
291 
292   private final int[] bucketSizes;
293   private final int bigItemSize;
294   // The capacity size for each bucket
295   private final long bucketCapacity;
296   private Bucket[] buckets;
297   private BucketSizeInfo[] bucketSizeInfos;
298   private final long totalSize;
299   private long usedSize = 0;
300 
301   BucketAllocator(long availableSpace, int[] bucketSizes)
302       throws BucketAllocatorException {
303     this.bucketSizes = bucketSizes == null ? DEFAULT_BUCKET_SIZES : bucketSizes;
304     Arrays.sort(this.bucketSizes);
305     this.bigItemSize = Ints.max(this.bucketSizes);
306     this.bucketCapacity = FEWEST_ITEMS_IN_BUCKET * bigItemSize;
307     buckets = new Bucket[(int) (availableSpace / bucketCapacity)];
308     if (buckets.length < this.bucketSizes.length)
309       throw new BucketAllocatorException(
310           "Bucket allocator size too small - must have room for at least "
311               + this.bucketSizes.length + " buckets");
312     bucketSizeInfos = new BucketSizeInfo[this.bucketSizes.length];
313     for (int i = 0; i < this.bucketSizes.length; ++i) {
314       bucketSizeInfos[i] = new BucketSizeInfo(i);
315     }
316     for (int i = 0; i < buckets.length; ++i) {
317       buckets[i] = new Bucket(bucketCapacity * i);
318       bucketSizeInfos[i < this.bucketSizes.length ? i : this.bucketSizes.length - 1]
319           .instantiateBucket(buckets[i]);
320     }
321     this.totalSize = ((long) buckets.length) * bucketCapacity;
322   }
323 
324   /**
325    * Rebuild the allocator's data structures from a persisted map.
326    * @param availableSpace capacity of cache
327    * @param map A map stores the block key and BucketEntry(block's meta data
328    *          like offset, length)
329    * @param realCacheSize cached data size statistics for bucket cache
330    * @throws BucketAllocatorException
331    */
332   BucketAllocator(long availableSpace, int[] bucketSizes, Map<BlockCacheKey, BucketEntry> map,
333       AtomicLong realCacheSize) throws BucketAllocatorException {
334     this(availableSpace, bucketSizes);
335 
336     // each bucket has an offset, sizeindex. probably the buckets are too big
337     // in our default state. so what we do is reconfigure them according to what
338     // we've found. we can only reconfigure each bucket once; if more than once,
339     // we know there's a bug, so we just log the info, throw, and start again...
340     boolean[] reconfigured = new boolean[buckets.length];
341     for (Map.Entry<BlockCacheKey, BucketEntry> entry : map.entrySet()) {
342       long foundOffset = entry.getValue().offset();
343       int foundLen = entry.getValue().getLength();
344       int bucketSizeIndex = -1;
345       for (int i = 0; i < bucketSizes.length; ++i) {
346         if (foundLen <= bucketSizes[i]) {
347           bucketSizeIndex = i;
348           break;
349         }
350       }
351       if (bucketSizeIndex == -1) {
352         throw new BucketAllocatorException(
353             "Can't match bucket size for the block with size " + foundLen);
354       }
355       int bucketNo = (int) (foundOffset / bucketCapacity);
356       if (bucketNo < 0 || bucketNo >= buckets.length)
357         throw new BucketAllocatorException("Can't find bucket " + bucketNo
358             + ", total buckets=" + buckets.length
359             + "; did you shrink the cache?");
360       Bucket b = buckets[bucketNo];
361       if (reconfigured[bucketNo]) {
362         if (b.sizeIndex() != bucketSizeIndex)
363           throw new BucketAllocatorException(
364               "Inconsistent allocation in bucket map;");
365       } else {
366         if (!b.isCompletelyFree())
367           throw new BucketAllocatorException("Reconfiguring bucket "
368               + bucketNo + " but it's already allocated; corrupt data");
369         // Need to remove the bucket from whichever list it's currently in at
370         // the moment...
371         BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex];
372         BucketSizeInfo oldbsi = bucketSizeInfos[b.sizeIndex()];
373         oldbsi.removeBucket(b);
374         bsi.instantiateBucket(b);
375         reconfigured[bucketNo] = true;
376       }
377       realCacheSize.addAndGet(foundLen);
378       buckets[bucketNo].addAllocation(foundOffset);
379       usedSize += buckets[bucketNo].getItemAllocationSize();
380       bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
381     }
382   }
383 
384   public String toString() {
385     StringBuilder sb = new StringBuilder(1024);
386     for (int i = 0; i < buckets.length; ++i) {
387       Bucket b = buckets[i];
388       if (i > 0) sb.append(", ");
389       sb.append("bucket.").append(i).append(": size=").append(b.getItemAllocationSize());
390       sb.append(", freeCount=").append(b.freeCount()).append(", used=").append(b.usedCount());
391     }
392     return sb.toString();
393   }
394 
395   public long getUsedSize() {
396     return this.usedSize;
397   }
398 
399   public long getFreeSize() {
400     return this.totalSize - getUsedSize();
401   }
402 
403   public long getTotalSize() {
404     return this.totalSize;
405   }
406 
407   /**
408    * Allocate a block with specified size. Return the offset
409    * @param blockSize size of block
410    * @throws BucketAllocatorException,CacheFullException
411    * @return the offset in the IOEngine
412    */
413   public synchronized long allocateBlock(int blockSize) throws CacheFullException,
414       BucketAllocatorException {
415     assert blockSize > 0;
416     BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize);
417     if (bsi == null) {
418       throw new BucketAllocatorException("Allocation too big size=" + blockSize +
419         "; adjust BucketCache sizes " + CacheConfig.BUCKET_CACHE_BUCKETS_KEY +
420         " to accomodate if size seems reasonable and you want it cached.");
421     }
422     long offset = bsi.allocateBlock();
423 
424     // Ask caller to free up space and try again!
425     if (offset < 0)
426       throw new CacheFullException(blockSize, bsi.sizeIndex());
427     usedSize += bucketSizes[bsi.sizeIndex()];
428     return offset;
429   }
430 
431   private Bucket grabGlobalCompletelyFreeBucket() {
432     for (BucketSizeInfo bsi : bucketSizeInfos) {
433       Bucket b = bsi.findAndRemoveCompletelyFreeBucket();
434       if (b != null) return b;
435     }
436     return null;
437   }
438 
439   /**
440    * Free a block with the offset
441    * @param offset block's offset
442    * @return size freed
443    */
444   public synchronized int freeBlock(long offset) {
445     int bucketNo = (int) (offset / bucketCapacity);
446     assert bucketNo >= 0 && bucketNo < buckets.length;
447     Bucket targetBucket = buckets[bucketNo];
448     bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset);
449     usedSize -= targetBucket.getItemAllocationSize();
450     return targetBucket.getItemAllocationSize();
451   }
452 
453   public int sizeIndexOfAllocation(long offset) {
454     int bucketNo = (int) (offset / bucketCapacity);
455     assert bucketNo >= 0 && bucketNo < buckets.length;
456     Bucket targetBucket = buckets[bucketNo];
457     return targetBucket.sizeIndex();
458   }
459 
460   public int sizeOfAllocation(long offset) {
461     int bucketNo = (int) (offset / bucketCapacity);
462     assert bucketNo >= 0 && bucketNo < buckets.length;
463     Bucket targetBucket = buckets[bucketNo];
464     return targetBucket.getItemAllocationSize();
465   }
466 
467   static class IndexStatistics {
468     private long freeCount, usedCount, itemSize, totalCount;
469 
470     public long freeCount() {
471       return freeCount;
472     }
473 
474     public long usedCount() {
475       return usedCount;
476     }
477 
478     public long totalCount() {
479       return totalCount;
480     }
481 
482     public long freeBytes() {
483       return freeCount * itemSize;
484     }
485 
486     public long usedBytes() {
487       return usedCount * itemSize;
488     }
489 
490     public long totalBytes() {
491       return totalCount * itemSize;
492     }
493 
494     public long itemSize() {
495       return itemSize;
496     }
497 
498     public IndexStatistics(long free, long used, long itemSize) {
499       setTo(free, used, itemSize);
500     }
501 
502     public IndexStatistics() {
503       setTo(-1, -1, 0);
504     }
505 
506     public void setTo(long free, long used, long itemSize) {
507       this.itemSize = itemSize;
508       this.freeCount = free;
509       this.usedCount = used;
510       this.totalCount = free + used;
511     }
512   }
513 
514   public Bucket [] getBuckets() {
515     return this.buckets;
516   }
517 
518   void logStatistics() {
519     IndexStatistics total = new IndexStatistics();
520     IndexStatistics[] stats = getIndexStatistics(total);
521     LOG.info("Bucket allocator statistics follow:\n");
522     LOG.info("  Free bytes=" + total.freeBytes() + "+; used bytes="
523         + total.usedBytes() + "; total bytes=" + total.totalBytes());
524     for (IndexStatistics s : stats) {
525       LOG.info("  Object size " + s.itemSize() + " used=" + s.usedCount()
526           + "; free=" + s.freeCount() + "; total=" + s.totalCount());
527     }
528   }
529 
530   IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) {
531     IndexStatistics[] stats = getIndexStatistics();
532     long totalfree = 0, totalused = 0;
533     for (IndexStatistics stat : stats) {
534       totalfree += stat.freeBytes();
535       totalused += stat.usedBytes();
536     }
537     grandTotal.setTo(totalfree, totalused, 1);
538     return stats;
539   }
540 
541   IndexStatistics[] getIndexStatistics() {
542     IndexStatistics[] stats = new IndexStatistics[bucketSizes.length];
543     for (int i = 0; i < stats.length; ++i)
544       stats[i] = bucketSizeInfos[i].statistics();
545     return stats;
546   }
547 
548   public long freeBlock(long freeList[]) {
549     long sz = 0;
550     for (int i = 0; i < freeList.length; ++i)
551       sz += freeBlock(freeList[i]);
552     return sz;
553   }
554 
555 }