View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.io.hfile.bucket;
22  
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.concurrent.atomic.AtomicLong;
29
30  import org.apache.commons.collections.map.LinkedMap;
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
35  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
36  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
37  import org.codehaus.jackson.annotate.JsonIgnoreProperties;
38
39  import com.google.common.base.Objects;
40  import com.google.common.base.Preconditions;
41  import com.google.common.primitives.Ints;
42
43  /**
44   * This class is used to allocate a block with specified size and free the block
45   * when evicting. It manages an array of buckets, each bucket is associated with
46   * a size and caches elements up to this size. For a completely empty bucket, this
47   * size could be re-specified dynamically.
48   * 
49   * This class is not thread safe.
50   */
51  @InterfaceAudience.Private
52  @JsonIgnoreProperties({"indexStatistics", "freeSize", "usedSize"})
53  public final class BucketAllocator {
54    private static final Log LOG = LogFactory.getLog(BucketAllocator.class);
55
56    @JsonIgnoreProperties({"completelyFree", "uninstantiated"})
57    public final static class Bucket {
58      private long baseOffset;
59      private int itemAllocationSize, sizeIndex;
60      private int itemCount;
61      private int freeList[];
62      private int freeCount, usedCount;
63
64      public Bucket(long offset) {
65        baseOffset = offset;
66        sizeIndex = -1;
67      }
68
69      void reconfigure(int sizeIndex, int[] bucketSizes, long bucketCapacity) {
70        Preconditions.checkElementIndex(sizeIndex, bucketSizes.length);
71        this.sizeIndex = sizeIndex;
72        itemAllocationSize = bucketSizes[sizeIndex];
73        itemCount = (int) (bucketCapacity / (long) itemAllocationSize);
74        freeCount = itemCount;
75        usedCount = 0;
76        freeList = new int[itemCount];
77        for (int i = 0; i < freeCount; ++i)
78          freeList[i] = i;
79      }
80
81      public boolean isUninstantiated() {
82        return sizeIndex == -1;
83      }
84
85      public int sizeIndex() {
86        return sizeIndex;
87      }
88
89      public int getItemAllocationSize() {
90        return itemAllocationSize;
91      }
92
93      public boolean hasFreeSpace() {
94        return freeCount > 0;
95      }
96
97      public boolean isCompletelyFree() {
98        return usedCount == 0;
99      }
100
101     public int freeCount() {
102       return freeCount;
103     }
104
105     public int usedCount() {
106       return usedCount;
107     }
108
109     public int getFreeBytes() {
110       return freeCount * itemAllocationSize;
111     }
112
113     public int getUsedBytes() {
114       return usedCount * itemAllocationSize;
115     }
116
117     public long getBaseOffset() {
118       return baseOffset;
119     }
120
121     /**
122      * Allocate a block in this bucket, return the offset representing the
123      * position in physical space
124      * @return the offset in the IOEngine
125      */
126     public long allocate() {
127       assert freeCount > 0; // Else should not have been called
128       assert sizeIndex != -1;
129       ++usedCount;
130       long offset = baseOffset + (freeList[--freeCount] * itemAllocationSize);
131       assert offset >= 0;
132       return offset;
133     }
134
135     public void addAllocation(long offset) throws BucketAllocatorException {
136       offset -= baseOffset;
137       if (offset < 0 || offset % itemAllocationSize != 0)
138         throw new BucketAllocatorException(
139             "Attempt to add allocation for bad offset: " + offset + " base="
140                 + baseOffset + ", bucket size=" + itemAllocationSize);
141       int idx = (int) (offset / itemAllocationSize);
142       boolean matchFound = false;
143       for (int i = 0; i < freeCount; ++i) {
144         if (matchFound) freeList[i - 1] = freeList[i];
145         else if (freeList[i] == idx) matchFound = true;
146       }
147       if (!matchFound)
148         throw new BucketAllocatorException("Couldn't find match for index "
149             + idx + " in free list");
150       ++usedCount;
151       --freeCount;
152     }
153
154     private void free(long offset) {
155       offset -= baseOffset;
156       assert offset >= 0;
157       assert offset < itemCount * itemAllocationSize;
158       assert offset % itemAllocationSize == 0;
159       assert usedCount > 0;
160       assert freeCount < itemCount; // Else duplicate free
161       int item = (int) (offset / (long) itemAllocationSize);
162       assert !freeListContains(item);
163       --usedCount;
164       freeList[freeCount++] = item;
165     }
166
167     private boolean freeListContains(int blockNo) {
168       for (int i = 0; i < freeCount; ++i) {
169         if (freeList[i] == blockNo) return true;
170       }
171       return false;
172     }
173   }
174
175   final class BucketSizeInfo {
176     // Free bucket means it has space to allocate a block;
177     // Completely free bucket means it has no block.
178     private LinkedMap bucketList, freeBuckets, completelyFreeBuckets;
179     private int sizeIndex;
180
181     BucketSizeInfo(int sizeIndex) {
182       bucketList = new LinkedMap();
183       freeBuckets = new LinkedMap();
184       completelyFreeBuckets = new LinkedMap();
185       this.sizeIndex = sizeIndex;
186     }
187
188     public synchronized void instantiateBucket(Bucket b) {
189       assert b.isUninstantiated() || b.isCompletelyFree();
190       b.reconfigure(sizeIndex, bucketSizes, bucketCapacity);
191       bucketList.put(b, b);
192       freeBuckets.put(b, b);
193       completelyFreeBuckets.put(b, b);
194     }
195
196     public int sizeIndex() {
197       return sizeIndex;
198     }
199
200     /**
201      * Find a bucket to allocate a block
202      * @return the offset in the IOEngine
203      */
204     public long allocateBlock() {
205       Bucket b = null;
206       if (freeBuckets.size() > 0) {
207         // Use up an existing one first...
208         b = (Bucket) freeBuckets.lastKey();
209       }
210       if (b == null) {
211         b = grabGlobalCompletelyFreeBucket();
212         if (b != null) instantiateBucket(b);
213       }
214       if (b == null) return -1;
215       long result = b.allocate();
216       blockAllocated(b);
217       return result;
218     }
219
220     void blockAllocated(Bucket b) {
221       if (!b.isCompletelyFree()) completelyFreeBuckets.remove(b);
222       if (!b.hasFreeSpace()) freeBuckets.remove(b);
223     }
224
225     public Bucket findAndRemoveCompletelyFreeBucket() {
226       Bucket b = null;
227       assert bucketList.size() > 0;
228       if (bucketList.size() == 1) {
229         // So we never get complete starvation of a bucket for a size
230         return null;
231       }
232
233       if (completelyFreeBuckets.size() > 0) {
234         b = (Bucket) completelyFreeBuckets.firstKey();
235         removeBucket(b);
236       }
237       return b;
238     }
239
240     private synchronized void removeBucket(Bucket b) {
241       assert b.isCompletelyFree();
242       bucketList.remove(b);
243       freeBuckets.remove(b);
244       completelyFreeBuckets.remove(b);
245     }
246
247     public void freeBlock(Bucket b, long offset) {
248       assert bucketList.containsKey(b);
249       // else we shouldn't have anything to free...
250       assert (!completelyFreeBuckets.containsKey(b));
251       b.free(offset);
252       if (!freeBuckets.containsKey(b)) freeBuckets.put(b, b);
253       if (b.isCompletelyFree()) completelyFreeBuckets.put(b, b);
254     }
255
256     public synchronized IndexStatistics statistics() {
257       long free = 0, used = 0;
258       for (Object obj : bucketList.keySet()) {
259         Bucket b = (Bucket) obj;
260         free += b.freeCount();
261         used += b.usedCount();
262       }
263       return new IndexStatistics(free, used, bucketSizes[sizeIndex]);
264     }
265
266     @Override
267     public String toString() {
268       return Objects.toStringHelper(this.getClass())
269         .add("sizeIndex", sizeIndex)
270         .add("bucketSize", bucketSizes[sizeIndex])
271         .toString();
272     }
273   }
274
275   // Default block size in hbase is 64K, so we choose more sizes near 64K, you'd better
276   // reset it according to your cluster's block size distribution
277   // TODO Support the view of block size distribution statistics
278   // TODO: Why we add the extra 1024 bytes? Slop?
279   private static final int DEFAULT_BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024,
280       16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024,
281       56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024,
282       192 * 1024 + 1024, 256 * 1024 + 1024, 384 * 1024 + 1024,
283       512 * 1024 + 1024 };
284
285   /**
286    * Round up the given block size to bucket size, and get the corresponding
287    * BucketSizeInfo
288    */
289   public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) {
290     for (int i = 0; i < bucketSizes.length; ++i)
291       if (blockSize <= bucketSizes[i])
292         return bucketSizeInfos[i];
293     return null;
294   }
295
296   /**
297    * So, what is the minimum amount of items we'll tolerate in a single bucket?
298    */
299   static public final int FEWEST_ITEMS_IN_BUCKET = 4;
300
301   private final int[] bucketSizes;
302   private final int bigItemSize;
303   // The capacity size for each bucket
304   private final long bucketCapacity;
305   private Bucket[] buckets;
306   private BucketSizeInfo[] bucketSizeInfos;
307   private final long totalSize;
308   private long usedSize = 0;
309
310   BucketAllocator(long availableSpace, int[] bucketSizes)
311       throws BucketAllocatorException {
312     this.bucketSizes = bucketSizes == null ? DEFAULT_BUCKET_SIZES : bucketSizes;
313     Arrays.sort(this.bucketSizes);
314     this.bigItemSize = Ints.max(this.bucketSizes);
315     this.bucketCapacity = FEWEST_ITEMS_IN_BUCKET * bigItemSize;
316     buckets = new Bucket[(int) (availableSpace / bucketCapacity)];
317     if (buckets.length < this.bucketSizes.length)
318       throw new BucketAllocatorException("Bucket allocator size too small (" + buckets.length +
319         "); must have room for at least " + this.bucketSizes.length + " buckets");
320     bucketSizeInfos = new BucketSizeInfo[this.bucketSizes.length];
321     for (int i = 0; i < this.bucketSizes.length; ++i) {
322       bucketSizeInfos[i] = new BucketSizeInfo(i);
323     }
324     for (int i = 0; i < buckets.length; ++i) {
325       buckets[i] = new Bucket(bucketCapacity * i);
326       bucketSizeInfos[i < this.bucketSizes.length ? i : this.bucketSizes.length - 1]
327           .instantiateBucket(buckets[i]);
328     }
329     this.totalSize = ((long) buckets.length) * bucketCapacity;
330     if (LOG.isInfoEnabled()) {
331       LOG.info("Cache totalSize=" + this.totalSize + ", buckets=" + this.buckets.length +
332         ", bucket capacity=" + this.bucketCapacity +
333         "=(" + FEWEST_ITEMS_IN_BUCKET + "*" + this.bigItemSize + ")=" +
334         "(FEWEST_ITEMS_IN_BUCKET*(largest configured bucketcache size))");
335     }
336   }
337
338   /**
339    * Rebuild the allocator's data structures from a persisted map.
340    * @param availableSpace capacity of cache
341    * @param map A map stores the block key and BucketEntry(block's meta data
342    *          like offset, length)
343    * @param realCacheSize cached data size statistics for bucket cache
344    * @throws BucketAllocatorException
345    */
346   BucketAllocator(long availableSpace, int[] bucketSizes, Map<BlockCacheKey, BucketEntry> map,
347       AtomicLong realCacheSize) throws BucketAllocatorException {
348     this(availableSpace, bucketSizes);
349
350     // each bucket has an offset, sizeindex. probably the buckets are too big
351     // in our default state. so what we do is reconfigure them according to what
352     // we've found. we can only reconfigure each bucket once; if more than once,
353     // we know there's a bug, so we just log the info, throw, and start again...
354     boolean[] reconfigured = new boolean[buckets.length];
355     int sizeNotMatchedCount = 0;
356     int insufficientCapacityCount = 0;
357     Iterator<Map.Entry<BlockCacheKey, BucketEntry>> iterator = map.entrySet().iterator();
358     while (iterator.hasNext()) {
359       Map.Entry<BlockCacheKey, BucketEntry> entry = iterator.next();
360       long foundOffset = entry.getValue().offset();
361       int foundLen = entry.getValue().getLength();
362       int bucketSizeIndex = -1;
363       for (int i = 0; i < this.bucketSizes.length; ++i) {
364         if (foundLen <= this.bucketSizes[i]) {
365           bucketSizeIndex = i;
366           break;
367         }
368       }
369       if (bucketSizeIndex == -1) {
370         sizeNotMatchedCount++;
371         iterator.remove();
372         continue;
373       }
374       int bucketNo = (int) (foundOffset / bucketCapacity);
375       if (bucketNo < 0 || bucketNo >= buckets.length) {
376         insufficientCapacityCount++;
377         iterator.remove();
378         continue;
379       }
380       Bucket b = buckets[bucketNo];
381       if (reconfigured[bucketNo]) {
382         if (b.sizeIndex() != bucketSizeIndex) {
383           throw new BucketAllocatorException("Inconsistent allocation in bucket map;");
384         }
385       } else {
386         if (!b.isCompletelyFree()) {
387           throw new BucketAllocatorException(
388               "Reconfiguring bucket " + bucketNo + " but it's already allocated; corrupt data");
389         }
390         // Need to remove the bucket from whichever list it's currently in at
391         // the moment...
392         BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex];
393         BucketSizeInfo oldbsi = bucketSizeInfos[b.sizeIndex()];
394         oldbsi.removeBucket(b);
395         bsi.instantiateBucket(b);
396         reconfigured[bucketNo] = true;
397       }
398       realCacheSize.addAndGet(foundLen);
399       buckets[bucketNo].addAllocation(foundOffset);
400       usedSize += buckets[bucketNo].getItemAllocationSize();
401       bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
402     }
403
404     if (sizeNotMatchedCount > 0) {
405       LOG.warn("There are " + sizeNotMatchedCount + " blocks which can't be rebuilt because " +
406         "there is no matching bucket size for these blocks");
407     }
408     if (insufficientCapacityCount > 0) {
409       LOG.warn("There are " + insufficientCapacityCount + " blocks which can't be rebuilt - "
410         + "did you shrink the cache?");
411     }
412   }
413 
414   public String toString() {
415     StringBuilder sb = new StringBuilder(1024);
416     for (int i = 0; i < buckets.length; ++i) {
417       Bucket b = buckets[i];
418       if (i > 0) sb.append(", ");
419       sb.append("bucket.").append(i).append(": size=").append(b.getItemAllocationSize());
420       sb.append(", freeCount=").append(b.freeCount()).append(", used=").append(b.usedCount());
421     }
422     return sb.toString();
423   }
424
425   public long getUsedSize() {
426     return this.usedSize;
427   }
428
429   public long getFreeSize() {
430     return this.totalSize - getUsedSize();
431   }
432
433   public long getTotalSize() {
434     return this.totalSize;
435   }
436
437   /**
438    * Allocate a block with specified size. Return the offset
439    * @param blockSize size of block
440    * @throws BucketAllocatorException
441    * @throws CacheFullException
442    * @return the offset in the IOEngine
443    */
444   public synchronized long allocateBlock(int blockSize) throws CacheFullException,
445       BucketAllocatorException {
446     assert blockSize > 0;
447     BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize);
448     if (bsi == null) {
449       throw new BucketAllocatorException("Allocation too big size=" + blockSize +
450         "; adjust BucketCache sizes " + CacheConfig.BUCKET_CACHE_BUCKETS_KEY +
451         " to accomodate if size seems reasonable and you want it cached.");
452     }
453     long offset = bsi.allocateBlock();
454
455     // Ask caller to free up space and try again!
456     if (offset < 0)
457       throw new CacheFullException(blockSize, bsi.sizeIndex());
458     usedSize += bucketSizes[bsi.sizeIndex()];
459     return offset;
460   }
461
462   private Bucket grabGlobalCompletelyFreeBucket() {
463     for (BucketSizeInfo bsi : bucketSizeInfos) {
464       Bucket b = bsi.findAndRemoveCompletelyFreeBucket();
465       if (b != null) return b;
466     }
467     return null;
468   }
469
470   /**
471    * Free a block with the offset
472    * @param offset block's offset
473    * @return size freed
474    */
475   public synchronized int freeBlock(long offset) {
476     int bucketNo = (int) (offset / bucketCapacity);
477     assert bucketNo >= 0 && bucketNo < buckets.length;
478     Bucket targetBucket = buckets[bucketNo];
479     bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset);
480     usedSize -= targetBucket.getItemAllocationSize();
481     return targetBucket.getItemAllocationSize();
482   }
483
484   public int sizeIndexOfAllocation(long offset) {
485     int bucketNo = (int) (offset / bucketCapacity);
486     assert bucketNo >= 0 && bucketNo < buckets.length;
487     Bucket targetBucket = buckets[bucketNo];
488     return targetBucket.sizeIndex();
489   }
490
491   public int sizeOfAllocation(long offset) {
492     int bucketNo = (int) (offset / bucketCapacity);
493     assert bucketNo >= 0 && bucketNo < buckets.length;
494     Bucket targetBucket = buckets[bucketNo];
495     return targetBucket.getItemAllocationSize();
496   }
497 
498   static class IndexStatistics {
499     private long freeCount, usedCount, itemSize, totalCount;
500
501     public long freeCount() {
502       return freeCount;
503     }
504
505     public long usedCount() {
506       return usedCount;
507     }
508
509     public long totalCount() {
510       return totalCount;
511     }
512
513     public long freeBytes() {
514       return freeCount * itemSize;
515     }
516
517     public long usedBytes() {
518       return usedCount * itemSize;
519     }
520
521     public long totalBytes() {
522       return totalCount * itemSize;
523     }
524
525     public long itemSize() {
526       return itemSize;
527     }
528
529     public IndexStatistics(long free, long used, long itemSize) {
530       setTo(free, used, itemSize);
531     }
532
533     public IndexStatistics() {
534       setTo(-1, -1, 0);
535     }
536
537     public void setTo(long free, long used, long itemSize) {
538       this.itemSize = itemSize;
539       this.freeCount = free;
540       this.usedCount = used;
541       this.totalCount = free + used;
542     }
543   }
544
545   public Bucket [] getBuckets() {
546     return this.buckets;
547   }
548
549   void logStatistics() {
550     IndexStatistics total = new IndexStatistics();
551     IndexStatistics[] stats = getIndexStatistics(total);
552     LOG.info("Bucket allocator statistics follow:\n");
553     LOG.info("  Free bytes=" + total.freeBytes() + "+; used bytes="
554         + total.usedBytes() + "; total bytes=" + total.totalBytes());
555     for (IndexStatistics s : stats) {
556       LOG.info("  Object size " + s.itemSize() + " used=" + s.usedCount()
557           + "; free=" + s.freeCount() + "; total=" + s.totalCount());
558     }
559   }
560
561   IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) {
562     IndexStatistics[] stats = getIndexStatistics();
563     long totalfree = 0, totalused = 0;
564     for (IndexStatistics stat : stats) {
565       totalfree += stat.freeBytes();
566       totalused += stat.usedBytes();
567     }
568     grandTotal.setTo(totalfree, totalused, 1);
569     return stats;
570   }
571
572   IndexStatistics[] getIndexStatistics() {
573     IndexStatistics[] stats = new IndexStatistics[bucketSizes.length];
574     for (int i = 0; i < stats.length; ++i)
575       stats[i] = bucketSizeInfos[i].statistics();
576     return stats;
577   }
578
579   public long freeBlock(long freeList[]) {
580     long sz = 0;
581     for (int i = 0; i < freeList.length; ++i)
582       sz += freeBlock(freeList[i]);
583     return sz;
584   }
585
586 }