View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.io.hfile.bucket;
22  
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.Iterator;
26  import java.util.Map;
27  import java.util.concurrent.atomic.AtomicLong;
28  
29  import org.apache.commons.collections.map.LinkedMap;
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
34  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
35  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
36  import org.codehaus.jackson.annotate.JsonIgnoreProperties;
37  
38  import com.google.common.base.Objects;
39  import com.google.common.base.Preconditions;
40  import com.google.common.primitives.Ints;
41  
42  /**
43   * This class is used to allocate a block with specified size and free the block
44   * when evicting. It manages an array of buckets, each bucket is associated with
45   * a size and caches elements up to this size. For a completely empty bucket, this
46   * size could be re-specified dynamically.
47   * 
48   * This class is not thread safe.
49   */
50  @InterfaceAudience.Private
51  @JsonIgnoreProperties({"indexStatistics", "freeSize", "usedSize"})
52  public final class BucketAllocator {
53    static final Log LOG = LogFactory.getLog(BucketAllocator.class);
54  
55    @JsonIgnoreProperties({"completelyFree", "uninstantiated"})
56    public final static class Bucket {
57      private long baseOffset;
58      private int itemAllocationSize, sizeIndex;
59      private int itemCount;
60      private int freeList[];
61      private int freeCount, usedCount;
62  
63      public Bucket(long offset) {
64        baseOffset = offset;
65        sizeIndex = -1;
66      }
67  
68      void reconfigure(int sizeIndex, int[] bucketSizes, long bucketCapacity) {
69        Preconditions.checkElementIndex(sizeIndex, bucketSizes.length);
70        this.sizeIndex = sizeIndex;
71        itemAllocationSize = bucketSizes[sizeIndex];
72        itemCount = (int) (bucketCapacity / (long) itemAllocationSize);
73        freeCount = itemCount;
74        usedCount = 0;
75        freeList = new int[itemCount];
76        for (int i = 0; i < freeCount; ++i)
77          freeList[i] = i;
78      }
79  
80      public boolean isUninstantiated() {
81        return sizeIndex == -1;
82      }
83  
84      public int sizeIndex() {
85        return sizeIndex;
86      }
87  
88      public int getItemAllocationSize() {
89        return itemAllocationSize;
90      }
91  
92      public boolean hasFreeSpace() {
93        return freeCount > 0;
94      }
95  
96      public boolean isCompletelyFree() {
97        return usedCount == 0;
98      }
99  
100     public int freeCount() {
101       return freeCount;
102     }
103 
104     public int usedCount() {
105       return usedCount;
106     }
107 
108     public int getFreeBytes() {
109       return freeCount * itemAllocationSize;
110     }
111 
112     public int getUsedBytes() {
113       return usedCount * itemAllocationSize;
114     }
115 
116     public long getBaseOffset() {
117       return baseOffset;
118     }
119 
120     /**
121      * Allocate a block in this bucket, return the offset representing the
122      * position in physical space
123      * @return the offset in the IOEngine
124      */
125     public long allocate() {
126       assert freeCount > 0; // Else should not have been called
127       assert sizeIndex != -1;
128       ++usedCount;
129       long offset = baseOffset + (freeList[--freeCount] * itemAllocationSize);
130       assert offset >= 0;
131       return offset;
132     }
133 
134     public void addAllocation(long offset) throws BucketAllocatorException {
135       offset -= baseOffset;
136       if (offset < 0 || offset % itemAllocationSize != 0)
137         throw new BucketAllocatorException(
138             "Attempt to add allocation for bad offset: " + offset + " base="
139                 + baseOffset + ", bucket size=" + itemAllocationSize);
140       int idx = (int) (offset / itemAllocationSize);
141       boolean matchFound = false;
142       for (int i = 0; i < freeCount; ++i) {
143         if (matchFound) freeList[i - 1] = freeList[i];
144         else if (freeList[i] == idx) matchFound = true;
145       }
146       if (!matchFound)
147         throw new BucketAllocatorException("Couldn't find match for index "
148             + idx + " in free list");
149       ++usedCount;
150       --freeCount;
151     }
152 
153     private void free(long offset) {
154       offset -= baseOffset;
155       assert offset >= 0;
156       assert offset < itemCount * itemAllocationSize;
157       assert offset % itemAllocationSize == 0;
158       assert usedCount > 0;
159       assert freeCount < itemCount; // Else duplicate free
160       int item = (int) (offset / (long) itemAllocationSize);
161       assert !freeListContains(item);
162       --usedCount;
163       freeList[freeCount++] = item;
164     }
165 
166     private boolean freeListContains(int blockNo) {
167       for (int i = 0; i < freeCount; ++i) {
168         if (freeList[i] == blockNo) return true;
169       }
170       return false;
171     }
172   }
173 
174   final class BucketSizeInfo {
175     // Free bucket means it has space to allocate a block;
176     // Completely free bucket means it has no block.
177     private LinkedMap bucketList, freeBuckets, completelyFreeBuckets;
178     private int sizeIndex;
179 
180     BucketSizeInfo(int sizeIndex) {
181       bucketList = new LinkedMap();
182       freeBuckets = new LinkedMap();
183       completelyFreeBuckets = new LinkedMap();
184       this.sizeIndex = sizeIndex;
185     }
186 
187     public void instantiateBucket(Bucket b) {
188       assert b.isUninstantiated() || b.isCompletelyFree();
189       b.reconfigure(sizeIndex, bucketSizes, bucketCapacity);
190       bucketList.put(b, b);
191       freeBuckets.put(b, b);
192       completelyFreeBuckets.put(b, b);
193     }
194 
195     public int sizeIndex() {
196       return sizeIndex;
197     }
198 
199     /**
200      * Find a bucket to allocate a block
201      * @return the offset in the IOEngine
202      */
203     public long allocateBlock() {
204       Bucket b = null;
205       if (freeBuckets.size() > 0) {
206         // Use up an existing one first...
207         b = (Bucket) freeBuckets.lastKey();
208       }
209       if (b == null) {
210         b = grabGlobalCompletelyFreeBucket();
211         if (b != null) instantiateBucket(b);
212       }
213       if (b == null) return -1;
214       long result = b.allocate();
215       blockAllocated(b);
216       return result;
217     }
218 
219     void blockAllocated(Bucket b) {
220       if (!b.isCompletelyFree()) completelyFreeBuckets.remove(b);
221       if (!b.hasFreeSpace()) freeBuckets.remove(b);
222     }
223 
224     public Bucket findAndRemoveCompletelyFreeBucket() {
225       Bucket b = null;
226       assert bucketList.size() > 0;
227       if (bucketList.size() == 1) {
228         // So we never get complete starvation of a bucket for a size
229         return null;
230       }
231 
232       if (completelyFreeBuckets.size() > 0) {
233         b = (Bucket) completelyFreeBuckets.firstKey();
234         removeBucket(b);
235       }
236       return b;
237     }
238 
239     private void removeBucket(Bucket b) {
240       assert b.isCompletelyFree();
241       bucketList.remove(b);
242       freeBuckets.remove(b);
243       completelyFreeBuckets.remove(b);
244     }
245 
246     public void freeBlock(Bucket b, long offset) {
247       assert bucketList.containsKey(b);
248       // else we shouldn't have anything to free...
249       assert (!completelyFreeBuckets.containsKey(b));
250       b.free(offset);
251       if (!freeBuckets.containsKey(b)) freeBuckets.put(b, b);
252       if (b.isCompletelyFree()) completelyFreeBuckets.put(b, b);
253     }
254 
255     public IndexStatistics statistics() {
256       long free = 0, used = 0;
257       for (Object obj : bucketList.keySet()) {
258         Bucket b = (Bucket) obj;
259         free += b.freeCount();
260         used += b.usedCount();
261       }
262       return new IndexStatistics(free, used, bucketSizes[sizeIndex]);
263     }
264 
265     @Override
266     public String toString() {
267       return Objects.toStringHelper(this.getClass())
268         .add("sizeIndex", sizeIndex)
269         .add("bucketSize", bucketSizes[sizeIndex])
270         .toString();
271     }
272   }
273 
274   // Default block size is 64K, so we choose more sizes near 64K, you'd better
275   // reset it according to your cluster's block size distribution
276   // TODO Support the view of block size distribution statistics
277   private static final int DEFAULT_BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024,
278       16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024,
279       56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024,
280       192 * 1024 + 1024, 256 * 1024 + 1024, 384 * 1024 + 1024,
281       512 * 1024 + 1024 };
282 
283   /**
284    * Round up the given block size to bucket size, and get the corresponding
285    * BucketSizeInfo
286    */
287   public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) {
288     for (int i = 0; i < bucketSizes.length; ++i)
289       if (blockSize <= bucketSizes[i])
290         return bucketSizeInfos[i];
291     return null;
292   }
293 
294   static public final int FEWEST_ITEMS_IN_BUCKET = 4;
295 
296   private final int[] bucketSizes;
297   private final int bigItemSize;
298   // The capacity size for each bucket
299   private final long bucketCapacity;
300   private Bucket[] buckets;
301   private BucketSizeInfo[] bucketSizeInfos;
302   private final long totalSize;
303   private long usedSize = 0;
304 
305   BucketAllocator(long availableSpace, int[] bucketSizes)
306       throws BucketAllocatorException {
307     this.bucketSizes = bucketSizes == null ? DEFAULT_BUCKET_SIZES : bucketSizes;
308     Arrays.sort(this.bucketSizes);
309     this.bigItemSize = Ints.max(this.bucketSizes);
310     this.bucketCapacity = FEWEST_ITEMS_IN_BUCKET * bigItemSize;
311     buckets = new Bucket[(int) (availableSpace / bucketCapacity)];
312     if (buckets.length < this.bucketSizes.length)
313       throw new BucketAllocatorException(
314           "Bucket allocator size too small - must have room for at least "
315               + this.bucketSizes.length + " buckets");
316     bucketSizeInfos = new BucketSizeInfo[this.bucketSizes.length];
317     for (int i = 0; i < this.bucketSizes.length; ++i) {
318       bucketSizeInfos[i] = new BucketSizeInfo(i);
319     }
320     for (int i = 0; i < buckets.length; ++i) {
321       buckets[i] = new Bucket(bucketCapacity * i);
322       bucketSizeInfos[i < this.bucketSizes.length ? i : this.bucketSizes.length - 1]
323           .instantiateBucket(buckets[i]);
324     }
325     this.totalSize = ((long) buckets.length) * bucketCapacity;
326   }
327 
328   /**
329    * Rebuild the allocator's data structures from a persisted map.
330    * @param availableSpace capacity of cache
331    * @param map A map stores the block key and BucketEntry(block's meta data
332    *          like offset, length)
333    * @param realCacheSize cached data size statistics for bucket cache
334    * @throws BucketAllocatorException
335    */
336   BucketAllocator(long availableSpace, int[] bucketSizes, Map<BlockCacheKey, BucketEntry> map,
337       AtomicLong realCacheSize) throws BucketAllocatorException {
338     this(availableSpace, bucketSizes);
339 
340     // each bucket has an offset, sizeindex. probably the buckets are too big
341     // in our default state. so what we do is reconfigure them according to what
342     // we've found. we can only reconfigure each bucket once; if more than once,
343     // we know there's a bug, so we just log the info, throw, and start again...
344     boolean[] reconfigured = new boolean[buckets.length];
345     int sizeNotMatchedCount = 0;
346     int insufficientCapacityCount = 0;
347     Iterator<Map.Entry<BlockCacheKey, BucketEntry>> iterator = map.entrySet().iterator();
348     while (iterator.hasNext()) {
349       Map.Entry<BlockCacheKey, BucketEntry> entry = iterator.next();
350       long foundOffset = entry.getValue().offset();
351       int foundLen = entry.getValue().getLength();
352       int bucketSizeIndex = -1;
353       for (int i = 0; i < this.bucketSizes.length; ++i) {
354         if (foundLen <= this.bucketSizes[i]) {
355           bucketSizeIndex = i;
356           break;
357         }
358       }
359       if (bucketSizeIndex == -1) {
360         sizeNotMatchedCount++;
361         iterator.remove();
362         continue;
363       }
364       int bucketNo = (int) (foundOffset / bucketCapacity);
365       if (bucketNo < 0 || bucketNo >= buckets.length) {
366         insufficientCapacityCount++;
367         iterator.remove();
368         continue;
369       }
370       Bucket b = buckets[bucketNo];
371       if (reconfigured[bucketNo]) {
372         if (b.sizeIndex() != bucketSizeIndex)
373           throw new BucketAllocatorException(
374               "Inconsistent allocation in bucket map;");
375       } else {
376         if (!b.isCompletelyFree())
377           throw new BucketAllocatorException("Reconfiguring bucket "
378               + bucketNo + " but it's already allocated; corrupt data");
379         // Need to remove the bucket from whichever list it's currently in at
380         // the moment...
381         BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex];
382         BucketSizeInfo oldbsi = bucketSizeInfos[b.sizeIndex()];
383         oldbsi.removeBucket(b);
384         bsi.instantiateBucket(b);
385         reconfigured[bucketNo] = true;
386       }
387       realCacheSize.addAndGet(foundLen);
388       buckets[bucketNo].addAllocation(foundOffset);
389       usedSize += buckets[bucketNo].getItemAllocationSize();
390       bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
391     }
392 
393     if (sizeNotMatchedCount > 0) {
394       LOG.warn("There are " + sizeNotMatchedCount + " blocks which can't be rebuilt because "
395           + "there is no matching bucket size for these blocks");
396     }
397     if (insufficientCapacityCount > 0) {
398       LOG.warn("There are " + insufficientCapacityCount + " blocks which can't be rebuilt - "
399           + "did you shrink the cache?");
400     }
401   }
402 
403   public String toString() {
404     StringBuilder sb = new StringBuilder(1024);
405     for (int i = 0; i < buckets.length; ++i) {
406       Bucket b = buckets[i];
407       if (i > 0) sb.append(", ");
408       sb.append("bucket.").append(i).append(": size=").append(b.getItemAllocationSize());
409       sb.append(", freeCount=").append(b.freeCount()).append(", used=").append(b.usedCount());
410     }
411     return sb.toString();
412   }
413 
414   public long getUsedSize() {
415     return this.usedSize;
416   }
417 
418   public long getFreeSize() {
419     return this.totalSize - getUsedSize();
420   }
421 
422   public long getTotalSize() {
423     return this.totalSize;
424   }
425 
426   /**
427    * Allocate a block with specified size. Return the offset
428    * @param blockSize size of block
429    * @throws BucketAllocatorException,CacheFullException
430    * @return the offset in the IOEngine
431    */
432   public synchronized long allocateBlock(int blockSize) throws CacheFullException,
433       BucketAllocatorException {
434     assert blockSize > 0;
435     BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize);
436     if (bsi == null) {
437       throw new BucketAllocatorException("Allocation too big size=" + blockSize +
438         "; adjust BucketCache sizes " + CacheConfig.BUCKET_CACHE_BUCKETS_KEY +
439         " to accomodate if size seems reasonable and you want it cached.");
440     }
441     long offset = bsi.allocateBlock();
442 
443     // Ask caller to free up space and try again!
444     if (offset < 0)
445       throw new CacheFullException(blockSize, bsi.sizeIndex());
446     usedSize += bucketSizes[bsi.sizeIndex()];
447     return offset;
448   }
449 
450   private Bucket grabGlobalCompletelyFreeBucket() {
451     for (BucketSizeInfo bsi : bucketSizeInfos) {
452       Bucket b = bsi.findAndRemoveCompletelyFreeBucket();
453       if (b != null) return b;
454     }
455     return null;
456   }
457 
458   /**
459    * Free a block with the offset
460    * @param offset block's offset
461    * @return size freed
462    */
463   public synchronized int freeBlock(long offset) {
464     int bucketNo = (int) (offset / bucketCapacity);
465     assert bucketNo >= 0 && bucketNo < buckets.length;
466     Bucket targetBucket = buckets[bucketNo];
467     bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset);
468     usedSize -= targetBucket.getItemAllocationSize();
469     return targetBucket.getItemAllocationSize();
470   }
471 
472   public int sizeIndexOfAllocation(long offset) {
473     int bucketNo = (int) (offset / bucketCapacity);
474     assert bucketNo >= 0 && bucketNo < buckets.length;
475     Bucket targetBucket = buckets[bucketNo];
476     return targetBucket.sizeIndex();
477   }
478 
479   public int sizeOfAllocation(long offset) {
480     int bucketNo = (int) (offset / bucketCapacity);
481     assert bucketNo >= 0 && bucketNo < buckets.length;
482     Bucket targetBucket = buckets[bucketNo];
483     return targetBucket.getItemAllocationSize();
484   }
485 
486   static class IndexStatistics {
487     private long freeCount, usedCount, itemSize, totalCount;
488 
489     public long freeCount() {
490       return freeCount;
491     }
492 
493     public long usedCount() {
494       return usedCount;
495     }
496 
497     public long totalCount() {
498       return totalCount;
499     }
500 
501     public long freeBytes() {
502       return freeCount * itemSize;
503     }
504 
505     public long usedBytes() {
506       return usedCount * itemSize;
507     }
508 
509     public long totalBytes() {
510       return totalCount * itemSize;
511     }
512 
513     public long itemSize() {
514       return itemSize;
515     }
516 
517     public IndexStatistics(long free, long used, long itemSize) {
518       setTo(free, used, itemSize);
519     }
520 
521     public IndexStatistics() {
522       setTo(-1, -1, 0);
523     }
524 
525     public void setTo(long free, long used, long itemSize) {
526       this.itemSize = itemSize;
527       this.freeCount = free;
528       this.usedCount = used;
529       this.totalCount = free + used;
530     }
531   }
532 
533   public Bucket [] getBuckets() {
534     return this.buckets;
535   }
536 
537   void logStatistics() {
538     IndexStatistics total = new IndexStatistics();
539     IndexStatistics[] stats = getIndexStatistics(total);
540     LOG.info("Bucket allocator statistics follow:\n");
541     LOG.info("  Free bytes=" + total.freeBytes() + "+; used bytes="
542         + total.usedBytes() + "; total bytes=" + total.totalBytes());
543     for (IndexStatistics s : stats) {
544       LOG.info("  Object size " + s.itemSize() + " used=" + s.usedCount()
545           + "; free=" + s.freeCount() + "; total=" + s.totalCount());
546     }
547   }
548 
549   IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) {
550     IndexStatistics[] stats = getIndexStatistics();
551     long totalfree = 0, totalused = 0;
552     for (IndexStatistics stat : stats) {
553       totalfree += stat.freeBytes();
554       totalused += stat.usedBytes();
555     }
556     grandTotal.setTo(totalfree, totalused, 1);
557     return stats;
558   }
559 
560   IndexStatistics[] getIndexStatistics() {
561     IndexStatistics[] stats = new IndexStatistics[bucketSizes.length];
562     for (int i = 0; i < stats.length; ++i)
563       stats[i] = bucketSizeInfos[i].statistics();
564     return stats;
565   }
566 
567   public long freeBlock(long freeList[]) {
568     long sz = 0;
569     for (int i = 0; i < freeList.length; ++i)
570       sz += freeBlock(freeList[i]);
571     return sz;
572   }
573 
574 }