View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.io.hfile.bucket;
22  
23  import java.util.LinkedList;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.concurrent.atomic.AtomicLong;
27  
28  import com.google.common.base.Objects;
29  import com.google.common.base.Preconditions;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.classification.InterfaceAudience;
34  import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
35  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
36  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
37  import org.codehaus.jackson.annotate.JsonIgnoreProperties;
38  
39  /**
40   * This class is used to allocate a block with specified size and free the block
41   * when evicting. It manages an array of buckets, each bucket is associated with
42   * a size and caches elements up to this size. For a completely empty bucket, this
43   * size could be re-specified dynamically.
44   * 
45   * This class is not thread safe.
46   */
47  @InterfaceAudience.Private
48  @JsonIgnoreProperties({"indexStatistics", "freeSize", "usedSize"})
49  public final class BucketAllocator {
50    static final Log LOG = LogFactory.getLog(BucketAllocator.class);
51  
52    @JsonIgnoreProperties({"completelyFree", "uninstantiated"})
53    public final static class Bucket {
54      private long baseOffset;
55      private int itemAllocationSize, sizeIndex;
56      private int itemCount;
57      private int freeList[];
58      private int freeCount, usedCount;
59  
60      public Bucket(long offset) {
61        baseOffset = offset;
62        sizeIndex = -1;
63      }
64  
65      void reconfigure(int sizeIndex, int[] bucketSizes, long bucketCapacity) {
66        Preconditions.checkElementIndex(sizeIndex, bucketSizes.length);
67        this.sizeIndex = sizeIndex;
68        itemAllocationSize = bucketSizes[sizeIndex];
69        itemCount = (int) (bucketCapacity / (long) itemAllocationSize);
70        freeCount = itemCount;
71        usedCount = 0;
72        freeList = new int[itemCount];
73        for (int i = 0; i < freeCount; ++i)
74          freeList[i] = i;
75      }
76  
77      public boolean isUninstantiated() {
78        return sizeIndex == -1;
79      }
80  
81      public int sizeIndex() {
82        return sizeIndex;
83      }
84  
85      public int getItemAllocationSize() {
86        return itemAllocationSize;
87      }
88  
89      public boolean hasFreeSpace() {
90        return freeCount > 0;
91      }
92  
93      public boolean isCompletelyFree() {
94        return usedCount == 0;
95      }
96  
97      public int freeCount() {
98        return freeCount;
99      }
100 
101     public int usedCount() {
102       return usedCount;
103     }
104 
105     public int getFreeBytes() {
106       return freeCount * itemAllocationSize;
107     }
108 
109     public int getUsedBytes() {
110       return usedCount * itemAllocationSize;
111     }
112 
113     public long getBaseOffset() {
114       return baseOffset;
115     }
116 
117     /**
118      * Allocate a block in this bucket, return the offset representing the
119      * position in physical space
120      * @return the offset in the IOEngine
121      */
122     public long allocate() {
123       assert freeCount > 0; // Else should not have been called
124       assert sizeIndex != -1;
125       ++usedCount;
126       long offset = baseOffset + (freeList[--freeCount] * itemAllocationSize);
127       assert offset >= 0;
128       return offset;
129     }
130 
131     public void addAllocation(long offset) throws BucketAllocatorException {
132       offset -= baseOffset;
133       if (offset < 0 || offset % itemAllocationSize != 0)
134         throw new BucketAllocatorException(
135             "Attempt to add allocation for bad offset: " + offset + " base="
136                 + baseOffset + ", bucket size=" + itemAllocationSize);
137       int idx = (int) (offset / itemAllocationSize);
138       boolean matchFound = false;
139       for (int i = 0; i < freeCount; ++i) {
140         if (matchFound) freeList[i - 1] = freeList[i];
141         else if (freeList[i] == idx) matchFound = true;
142       }
143       if (!matchFound)
144         throw new BucketAllocatorException("Couldn't find match for index "
145             + idx + " in free list");
146       ++usedCount;
147       --freeCount;
148     }
149 
150     private void free(long offset) {
151       offset -= baseOffset;
152       assert offset >= 0;
153       assert offset < itemCount * itemAllocationSize;
154       assert offset % itemAllocationSize == 0;
155       assert usedCount > 0;
156       assert freeCount < itemCount; // Else duplicate free
157       int item = (int) (offset / (long) itemAllocationSize);
158       assert !freeListContains(item);
159       --usedCount;
160       freeList[freeCount++] = item;
161     }
162 
163     private boolean freeListContains(int blockNo) {
164       for (int i = 0; i < freeCount; ++i) {
165         if (freeList[i] == blockNo) return true;
166       }
167       return false;
168     }
169   }
170 
171   final class BucketSizeInfo {
172     // Free bucket means it has space to allocate a block;
173     // Completely free bucket means it has no block.
174     private List<Bucket> bucketList, freeBuckets, completelyFreeBuckets;
175     private int sizeIndex;
176 
177     BucketSizeInfo(int sizeIndex) {
178       bucketList = new LinkedList<Bucket>();
179       freeBuckets = new LinkedList<Bucket>();
180       completelyFreeBuckets = new LinkedList<Bucket>();
181       this.sizeIndex = sizeIndex;
182     }
183 
184     public synchronized void instantiateBucket(Bucket b) {
185       assert b.isUninstantiated() || b.isCompletelyFree();
186       b.reconfigure(sizeIndex, bucketSizes, bucketCapacity);
187       bucketList.add(b);
188       freeBuckets.add(b);
189       completelyFreeBuckets.add(b);
190     }
191 
192     public int sizeIndex() {
193       return sizeIndex;
194     }
195 
196     /**
197      * Find a bucket to allocate a block
198      * @return the offset in the IOEngine
199      */
200     public long allocateBlock() {
201       Bucket b = null;
202       if (freeBuckets.size() > 0) // Use up an existing one first...
203         b = freeBuckets.get(freeBuckets.size() - 1);
204       if (b == null) {
205         b = grabGlobalCompletelyFreeBucket();
206         if (b != null) instantiateBucket(b);
207       }
208       if (b == null) return -1;
209       long result = b.allocate();
210       blockAllocated(b);
211       return result;
212     }
213 
214     void blockAllocated(Bucket b) {
215       if (!b.isCompletelyFree()) completelyFreeBuckets.remove(b);
216       if (!b.hasFreeSpace()) freeBuckets.remove(b);
217     }
218 
219     public Bucket findAndRemoveCompletelyFreeBucket() {
220       Bucket b = null;
221       assert bucketList.size() > 0;
222       if (bucketList.size() == 1) {
223         // So we never get complete starvation of a bucket for a size
224         return null;
225       }
226 
227       if (completelyFreeBuckets.size() > 0) {
228         b = completelyFreeBuckets.get(0);
229         removeBucket(b);
230       }
231       return b;
232     }
233 
234     private synchronized void removeBucket(Bucket b) {
235       assert b.isCompletelyFree();
236       bucketList.remove(b);
237       freeBuckets.remove(b);
238       completelyFreeBuckets.remove(b);
239     }
240 
241     public void freeBlock(Bucket b, long offset) {
242       assert bucketList.contains(b);
243       // else we shouldn't have anything to free...
244       assert (!completelyFreeBuckets.contains(b));
245       b.free(offset);
246       if (!freeBuckets.contains(b)) freeBuckets.add(b);
247       if (b.isCompletelyFree()) completelyFreeBuckets.add(b);
248     }
249 
250     public synchronized IndexStatistics statistics() {
251       long free = 0, used = 0;
252       for (Bucket b : bucketList) {
253         free += b.freeCount();
254         used += b.usedCount();
255       }
256       return new IndexStatistics(free, used, bucketSizes[sizeIndex]);
257     }
258 
259     @Override
260     public String toString() {
261       return Objects.toStringHelper(this.getClass())
262         .add("sizeIndex", sizeIndex)
263         .add("bucketSize", bucketSizes[sizeIndex])
264         .toString();
265     }
266   }
267 
268   // Default block size is 64K, so we choose more sizes near 64K, you'd better
269   // reset it according to your cluster's block size distribution
270   // TODO Support the view of block size distribution statistics
271   private static final int DEFAULT_BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024,
272       16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024,
273       56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024,
274       192 * 1024 + 1024, 256 * 1024 + 1024, 384 * 1024 + 1024,
275       512 * 1024 + 1024 };
276 
277   /**
278    * Round up the given block size to bucket size, and get the corresponding
279    * BucketSizeInfo
280    */
281   public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) {
282     for (int i = 0; i < bucketSizes.length; ++i)
283       if (blockSize <= bucketSizes[i])
284         return bucketSizeInfos[i];
285     return null;
286   }
287 
288   static public final int FEWEST_ITEMS_IN_BUCKET = 4;
289 
290   private final int[] bucketSizes;
291   private final int bigItemSize;
292   // The capacity size for each bucket
293   private final long bucketCapacity;
294   private Bucket[] buckets;
295   private BucketSizeInfo[] bucketSizeInfos;
296   private final long totalSize;
297   private long usedSize = 0;
298 
299   BucketAllocator(long availableSpace, int[] bucketSizes)
300       throws BucketAllocatorException {
301     this.bucketSizes = bucketSizes == null ? DEFAULT_BUCKET_SIZES : bucketSizes;
302     int largestBucket = this.bucketSizes[0];
303     for (int i : this.bucketSizes) {
304       largestBucket = Math.max(largestBucket, i);
305     }
306     this.bigItemSize = largestBucket;
307     this.bucketCapacity = FEWEST_ITEMS_IN_BUCKET * bigItemSize;
308     buckets = new Bucket[(int) (availableSpace / bucketCapacity)];
309     if (buckets.length < this.bucketSizes.length)
310       throw new BucketAllocatorException(
311           "Bucket allocator size too small - must have room for at least "
312               + this.bucketSizes.length + " buckets");
313     bucketSizeInfos = new BucketSizeInfo[this.bucketSizes.length];
314     for (int i = 0; i < this.bucketSizes.length; ++i) {
315       bucketSizeInfos[i] = new BucketSizeInfo(i);
316     }
317     for (int i = 0; i < buckets.length; ++i) {
318       buckets[i] = new Bucket(bucketCapacity * i);
319       bucketSizeInfos[i < this.bucketSizes.length ? i : this.bucketSizes.length - 1]
320           .instantiateBucket(buckets[i]);
321     }
322     this.totalSize = ((long) buckets.length) * bucketCapacity;
323   }
324 
325   /**
326    * Rebuild the allocator's data structures from a persisted map.
327    * @param availableSpace capacity of cache
328    * @param map A map stores the block key and BucketEntry(block's meta data
329    *          like offset, length)
330    * @param realCacheSize cached data size statistics for bucket cache
331    * @throws BucketAllocatorException
332    */
333   BucketAllocator(long availableSpace, int[] bucketSizes, Map<BlockCacheKey, BucketEntry> map,
334       AtomicLong realCacheSize) throws BucketAllocatorException {
335     this(availableSpace, bucketSizes);
336 
337     // each bucket has an offset, sizeindex. probably the buckets are too big
338     // in our default state. so what we do is reconfigure them according to what
339     // we've found. we can only reconfigure each bucket once; if more than once,
340     // we know there's a bug, so we just log the info, throw, and start again...
341     boolean[] reconfigured = new boolean[buckets.length];
342     for (Map.Entry<BlockCacheKey, BucketEntry> entry : map.entrySet()) {
343       long foundOffset = entry.getValue().offset();
344       int foundLen = entry.getValue().getLength();
345       int bucketSizeIndex = -1;
346       for (int i = 0; i < bucketSizes.length; ++i) {
347         if (foundLen <= bucketSizes[i]) {
348           bucketSizeIndex = i;
349           break;
350         }
351       }
352       if (bucketSizeIndex == -1) {
353         throw new BucketAllocatorException(
354             "Can't match bucket size for the block with size " + foundLen);
355       }
356       int bucketNo = (int) (foundOffset / bucketCapacity);
357       if (bucketNo < 0 || bucketNo >= buckets.length)
358         throw new BucketAllocatorException("Can't find bucket " + bucketNo
359             + ", total buckets=" + buckets.length
360             + "; did you shrink the cache?");
361       Bucket b = buckets[bucketNo];
362       if (reconfigured[bucketNo]) {
363         if (b.sizeIndex() != bucketSizeIndex)
364           throw new BucketAllocatorException(
365               "Inconsistent allocation in bucket map;");
366       } else {
367         if (!b.isCompletelyFree())
368           throw new BucketAllocatorException("Reconfiguring bucket "
369               + bucketNo + " but it's already allocated; corrupt data");
370         // Need to remove the bucket from whichever list it's currently in at
371         // the moment...
372         BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex];
373         BucketSizeInfo oldbsi = bucketSizeInfos[b.sizeIndex()];
374         oldbsi.removeBucket(b);
375         bsi.instantiateBucket(b);
376         reconfigured[bucketNo] = true;
377       }
378       realCacheSize.addAndGet(foundLen);
379       buckets[bucketNo].addAllocation(foundOffset);
380       usedSize += buckets[bucketNo].getItemAllocationSize();
381       bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
382     }
383   }
384 
385   public String toString() {
386     StringBuilder sb = new StringBuilder(1024);
387     for (int i = 0; i < buckets.length; ++i) {
388       Bucket b = buckets[i];
389       if (i > 0) sb.append(", ");
390       sb.append("bucket.").append(i).append(": size=").append(b.getItemAllocationSize());
391       sb.append(", freeCount=").append(b.freeCount()).append(", used=").append(b.usedCount());
392     }
393     return sb.toString();
394   }
395 
396   public long getUsedSize() {
397     return this.usedSize;
398   }
399 
400   public long getFreeSize() {
401     return this.totalSize - getUsedSize();
402   }
403 
404   public long getTotalSize() {
405     return this.totalSize;
406   }
407 
408   /**
409    * Allocate a block with specified size. Return the offset
410    * @param blockSize size of block
411    * @throws BucketAllocatorException,CacheFullException
412    * @return the offset in the IOEngine
413    */
414   public synchronized long allocateBlock(int blockSize) throws CacheFullException,
415       BucketAllocatorException {
416     assert blockSize > 0;
417     BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize);
418     if (bsi == null) {
419       throw new BucketAllocatorException("Allocation too big size=" + blockSize +
420         "; adjust BucketCache sizes " + CacheConfig.BUCKET_CACHE_BUCKETS_KEY +
421         " to accomodate if size seems reasonable and you want it cached.");
422     }
423     long offset = bsi.allocateBlock();
424 
425     // Ask caller to free up space and try again!
426     if (offset < 0)
427       throw new CacheFullException(blockSize, bsi.sizeIndex());
428     usedSize += bucketSizes[bsi.sizeIndex()];
429     return offset;
430   }
431 
432   private Bucket grabGlobalCompletelyFreeBucket() {
433     for (BucketSizeInfo bsi : bucketSizeInfos) {
434       Bucket b = bsi.findAndRemoveCompletelyFreeBucket();
435       if (b != null) return b;
436     }
437     return null;
438   }
439 
440   /**
441    * Free a block with the offset
442    * @param offset block's offset
443    * @return size freed
444    */
445   public synchronized int freeBlock(long offset) {
446     int bucketNo = (int) (offset / bucketCapacity);
447     assert bucketNo >= 0 && bucketNo < buckets.length;
448     Bucket targetBucket = buckets[bucketNo];
449     bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset);
450     usedSize -= targetBucket.getItemAllocationSize();
451     return targetBucket.getItemAllocationSize();
452   }
453 
454   public int sizeIndexOfAllocation(long offset) {
455     int bucketNo = (int) (offset / bucketCapacity);
456     assert bucketNo >= 0 && bucketNo < buckets.length;
457     Bucket targetBucket = buckets[bucketNo];
458     return targetBucket.sizeIndex();
459   }
460 
461   public int sizeOfAllocation(long offset) {
462     int bucketNo = (int) (offset / bucketCapacity);
463     assert bucketNo >= 0 && bucketNo < buckets.length;
464     Bucket targetBucket = buckets[bucketNo];
465     return targetBucket.getItemAllocationSize();
466   }
467 
468   static class IndexStatistics {
469     private long freeCount, usedCount, itemSize, totalCount;
470 
471     public long freeCount() {
472       return freeCount;
473     }
474 
475     public long usedCount() {
476       return usedCount;
477     }
478 
479     public long totalCount() {
480       return totalCount;
481     }
482 
483     public long freeBytes() {
484       return freeCount * itemSize;
485     }
486 
487     public long usedBytes() {
488       return usedCount * itemSize;
489     }
490 
491     public long totalBytes() {
492       return totalCount * itemSize;
493     }
494 
495     public long itemSize() {
496       return itemSize;
497     }
498 
499     public IndexStatistics(long free, long used, long itemSize) {
500       setTo(free, used, itemSize);
501     }
502 
503     public IndexStatistics() {
504       setTo(-1, -1, 0);
505     }
506 
507     public void setTo(long free, long used, long itemSize) {
508       this.itemSize = itemSize;
509       this.freeCount = free;
510       this.usedCount = used;
511       this.totalCount = free + used;
512     }
513   }
514 
515   public Bucket [] getBuckets() {
516     return this.buckets;
517   }
518 
519   void logStatistics() {
520     IndexStatistics total = new IndexStatistics();
521     IndexStatistics[] stats = getIndexStatistics(total);
522     LOG.info("Bucket allocator statistics follow:\n");
523     LOG.info("  Free bytes=" + total.freeBytes() + "+; used bytes="
524         + total.usedBytes() + "; total bytes=" + total.totalBytes());
525     for (IndexStatistics s : stats) {
526       LOG.info("  Object size " + s.itemSize() + " used=" + s.usedCount()
527           + "; free=" + s.freeCount() + "; total=" + s.totalCount());
528     }
529   }
530 
531   IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) {
532     IndexStatistics[] stats = getIndexStatistics();
533     long totalfree = 0, totalused = 0;
534     for (IndexStatistics stat : stats) {
535       totalfree += stat.freeBytes();
536       totalused += stat.usedBytes();
537     }
538     grandTotal.setTo(totalfree, totalused, 1);
539     return stats;
540   }
541 
542   IndexStatistics[] getIndexStatistics() {
543     IndexStatistics[] stats = new IndexStatistics[bucketSizes.length];
544     for (int i = 0; i < stats.length; ++i)
545       stats[i] = bucketSizeInfos[i].statistics();
546     return stats;
547   }
548 
549   public long freeBlock(long freeList[]) {
550     long sz = 0;
551     for (int i = 0; i < freeList.length; ++i)
552       sz += freeBlock(freeList[i]);
553     return sz;
554   }
555 
556 }